1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::{Subtitle, SynthesisEvent};
3use anyhow::Result;
4use async_trait::async_trait;
5use base64::{Engine, prelude::BASE64_STANDARD};
6use chrono::Duration;
7use futures::{
8 SinkExt, Stream, StreamExt, future,
9 stream::{self, BoxStream, SplitSink},
10};
11use ring::hmac;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14use tokio::{
15 net::TcpStream,
16 sync::{Notify, mpsc},
17};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19use tokio_tungstenite::{
20 MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
21};
22use tracing::{debug, warn};
23use unic_emoji::char::is_emoji;
24use urlencoding;
25use uuid::Uuid;
26
27const HOST: &str = "tts.cloud.tencent.com";
28const NON_STREAMING_PATH: &str = "/stream_ws";
29const STREAMING_PATH: &str = "/stream_wsv2";
30
31type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
32type WsSink = SplitSink<WsStream, Message>;
33
34#[derive(Debug, Serialize)]
35struct WebSocketRequest {
36 session_id: String,
37 message_id: String,
38 action: String,
39 data: String,
40}
41
42impl WebSocketRequest {
43 fn synthesis_action(session_id: &str, text: &str) -> Self {
44 let message_id = Uuid::new_v4().to_string();
45 Self {
46 session_id: session_id.to_string(),
47 message_id,
48 action: "ACTION_SYNTHESIS".to_string(),
49 data: text.to_string(),
50 }
51 }
52
53 fn complete_action(session_id: &str) -> Self {
54 let message_id = Uuid::new_v4().to_string();
55 Self {
56 session_id: session_id.to_string(),
57 message_id: message_id.to_string(),
58 action: "ACTION_COMPLETE".to_string(),
59 data: "".to_string(),
60 }
61 }
62}
63
64#[derive(Debug, Deserialize)]
65struct WebSocketResponse {
66 code: i32,
67 message: String,
68 r#final: i32,
69 result: WebSocketResult,
70 ready: u32,
71 heartbeat: u32,
72}
73
74#[derive(Debug, Deserialize)]
75struct WebSocketResult {
76 subtitles: Option<Vec<TencentSubtitle>>,
77}
78
79#[derive(Debug, Deserialize)]
80#[serde(rename_all = "PascalCase")]
81pub(super) struct TencentSubtitle {
82 text: String,
83 begin_time: u32,
84 end_time: u32,
85 begin_index: u32,
86 end_index: u32,
87}
88
89impl From<&TencentSubtitle> for Subtitle {
90 fn from(subtitle: &TencentSubtitle) -> Self {
91 Subtitle::new(
92 subtitle.text.clone(),
93 subtitle.begin_time,
94 subtitle.end_time,
95 subtitle.begin_index,
96 subtitle.end_index,
97 )
98 }
99}
100
101pub fn strip_emoji_chars(text: &str) -> String {
106 text.chars()
107 .filter(|&c| c.is_ascii() || !is_emoji(c))
108 .collect()
109}
110
111fn construct_request_url(option: &SynthesisOption, session_id: &str, text: Option<&str>) -> String {
115 let streaming = text.is_none();
116 let action = if !streaming {
117 "TextToStreamAudioWS"
118 } else {
119 "TextToStreamAudioWSv2"
120 };
121 let secret_id = option.secret_id.clone().unwrap_or_default();
122 let secret_key = option.secret_key.clone().unwrap_or_default();
123 let app_id = option.app_id.clone().unwrap_or_default();
124 let volume = option.volume.unwrap_or(0);
125 let speed = option.speed.unwrap_or(0.0);
126 let codec = option.codec.clone().unwrap_or_else(|| "pcm".to_string());
127 let sample_rate = option.samplerate.unwrap_or(16000);
128 let now = chrono::Utc::now();
129 let timestamp = now.timestamp();
130 let tomorrow = now + Duration::days(1);
131 let expired = tomorrow.timestamp();
132 let expired_str = expired.to_string();
133 let sample_rate_str = sample_rate.to_string();
134 let speed_str = speed.to_string();
135 let timestamp_str = timestamp.to_string();
136 let volume_str = volume.to_string();
137 let voice_type = option
138 .speaker
139 .clone()
140 .unwrap_or_else(|| "101001".to_string());
141 let mut query_params = vec![
142 ("Action", action),
143 ("AppId", &app_id),
144 ("SecretId", &secret_id),
145 ("Timestamp", ×tamp_str),
146 ("Expired", &expired_str),
147 ("SessionId", &session_id),
148 ("VoiceType", &voice_type),
149 ("Volume", &volume_str),
150 ("Speed", &speed_str),
151 ("SampleRate", &sample_rate_str),
152 ("Codec", &codec),
153 ("EnableSubtitle", "true"),
154 ];
155
156 if let Some(text) = text {
157 query_params.push(("Text", text));
158 }
159
160 query_params.sort_by(|a, b| a.0.cmp(b.0));
162
163 let query_string = query_params
165 .iter()
166 .map(|(k, v)| format!("{}={}", k, v))
167 .collect::<Vec<_>>()
168 .join("&");
169
170 let path = if streaming {
171 STREAMING_PATH
172 } else {
173 NON_STREAMING_PATH
174 };
175
176 let string_to_sign = format!("GET{}{}?{}", HOST, path, query_string);
177 let key = hmac::Key::new(hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, secret_key.as_bytes());
179 let tag = hmac::sign(&key, string_to_sign.as_bytes());
180 let signature: String = BASE64_STANDARD.encode(tag.as_ref());
181 let encoded_query_string = query_params
183 .iter()
184 .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
185 .collect::<Vec<_>>()
186 .join("&");
187
188 format!(
189 "wss://{}{}?{}&Signature={}",
190 HOST,
191 path,
192 encoded_query_string,
193 urlencoding::encode(&signature)
194 )
195}
196
197fn ws_to_event_stream<T>(
201 ws_stream: T,
202) -> impl Stream<Item = Result<SynthesisEvent>> + Send + 'static
203where
204 T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
205 + Send
206 + Unpin
207 + 'static,
208{
209 let notify = Arc::new(Notify::new());
210 let notify_clone = notify.clone();
211 ws_stream
212 .take_until(notify.notified_owned())
213 .filter_map(move |message| {
214 let notify = notify_clone.clone();
215 async move {
216 match message {
217 Ok(Message::Binary(data)) => Some(Ok(SynthesisEvent::AudioChunk(data))),
218 Ok(Message::Text(text)) => {
219 let response: WebSocketResponse =
220 serde_json::from_str(&text).expect("Tencent TTS API changed!");
221
222 if response.code != 0 {
223 notify.notify_one();
224 return Some(Err(anyhow::anyhow!(
225 "Tencent TTS error, code: {}, message: {}",
226 response.code,
227 response.message
228 )));
229 }
230
231 if response.heartbeat == 1 {
232 return None;
233 }
234
235 if let Some(subtitles) = response.result.subtitles {
236 let subtitles: Vec<Subtitle> =
237 subtitles.iter().map(Into::into).collect();
238 return Some(Ok(SynthesisEvent::Subtitles(subtitles)));
239 }
240
241 if response.r#final == 1 {
243 notify.notify_one();
244 return Some(Ok(SynthesisEvent::Finished));
245 }
246
247 None
248 }
249 Ok(Message::Close(_)) => {
250 notify.notify_one();
251 warn!("Tencent TTS closed by remote");
252 None
253 }
254 Err(e) => {
255 notify.notify_one();
256 Some(Err(anyhow::anyhow!("Tencent TTS websocket error: {:?}", e)))
257 }
258 _ => None,
259 }
260 }
261 })
262}
263
264pub struct RealTimeClient {
268 option: SynthesisOption,
269 tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
271}
272
273impl RealTimeClient {
274 fn new(option: SynthesisOption) -> Self {
275 Self { option, tx: None }
276 }
277}
278
279#[async_trait]
280impl SynthesisClient for RealTimeClient {
281 fn provider(&self) -> SynthesisType {
282 SynthesisType::TencentCloud
283 }
284
285 async fn start(
286 &mut self,
287 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
288 let (tx, rx) = mpsc::unbounded_channel();
291 self.tx = Some(tx);
292 let client_option = self.option.clone();
293 let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
294 let stream = UnboundedReceiverStream::new(rx)
295 .flat_map_unordered(max_concurrent_tasks, move |(text, cmd_seq, option)| {
296 let session_id = Uuid::new_v4().to_string();
298 let option = client_option.merge_with(option);
299 let url = construct_request_url(&option, &session_id, Some(&text));
300 stream::once(connect_async(url))
301 .flat_map(move |res| match res {
302 Ok((ws_stream, _)) => ws_to_event_stream(ws_stream).boxed(),
303 Err(e) => stream::once(future::ready(Err(e.into()))).boxed(),
304 })
305 .map(move |x| (cmd_seq, x))
306 .boxed()
307 })
308 .boxed();
309 Ok(stream)
310 }
311
312 async fn synthesize(
313 &mut self,
314 text: &str,
315 cmd_seq: Option<usize>,
316 option: Option<SynthesisOption>,
317 ) -> Result<()> {
318 if let Some(tx) = &self.tx {
319 let text = strip_emoji_chars(text);
320 tx.send((text, cmd_seq, option))?;
321 } else {
322 return Err(anyhow::anyhow!("TencentCloud TTS: missing client sender"));
323 };
324
325 Ok(())
326 }
327
328 async fn stop(&mut self) -> Result<()> {
329 self.tx.take();
330 Ok(())
331 }
332}
333
334struct StreamingClient {
338 session_id: String,
339 option: SynthesisOption,
340 sink: Option<WsSink>,
341}
342
343impl StreamingClient {
344 pub fn new(option: SynthesisOption) -> Self {
345 let session_id = Uuid::new_v4().to_string();
346 Self {
347 session_id,
348 option,
349 sink: None,
350 }
351 }
352}
353
354impl StreamingClient {
355 async fn connect(&self) -> Result<WsStream> {
356 let url = construct_request_url(&self.option, &self.session_id, None);
357 let (mut ws_stream, _) = connect_async(url).await?;
358 while let Some(message) = ws_stream.next().await {
360 match message {
361 Ok(Message::Text(text)) => {
362 let response = serde_json::from_str::<WebSocketResponse>(&text)?;
363 if response.ready == 1 {
364 debug!("TencentCloud TTS streaming client connected");
365 return Ok(ws_stream);
366 }
367
368 if response.code != 0 {
369 return Err(anyhow::anyhow!(
370 "TencentCloud TTS streaming client connecting failed: code: {}, message: {}",
371 response.code,
372 response.message
373 ));
374 }
375 }
376 Ok(Message::Close { .. }) => {
377 return Err(anyhow::anyhow!(
378 "TencentCloud TTS streaming client connecting failed: closed by remote"
379 ));
380 }
381 Err(e) => {
382 return Err(anyhow::anyhow!(
383 "TencentCloud TTS streaming client connecting failed: websocket error: {}",
384 e
385 ));
386 }
387 _ => {}
388 }
389 }
390
391 Err(anyhow::anyhow!(
392 "TencentCloud TTS streaming client connecting failed"
393 ))
394 }
395}
396
397#[async_trait]
398impl SynthesisClient for StreamingClient {
399 fn provider(&self) -> SynthesisType {
400 SynthesisType::TencentCloud
401 }
402
403 async fn start(
404 &mut self,
405 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
406 let stream = self.connect().await?;
407 let (ws_sink, ws_stream) = stream.split();
408 self.sink = Some(ws_sink);
409 let stream = ws_to_event_stream(ws_stream)
410 .map(move |event| (None, event))
411 .boxed();
412 Ok(stream)
413 }
414
415 async fn synthesize(
416 &mut self,
417 text: &str,
418 _cmd_seq: Option<usize>,
419 _option: Option<SynthesisOption>,
420 ) -> Result<()> {
421 if let Some(sink) = &mut self.sink {
422 let text = strip_emoji_chars(text);
423 let request = WebSocketRequest::synthesis_action(&self.session_id, &text);
424 let data = serde_json::to_string(&request)?;
425 sink.send(Message::Text(data.into())).await?;
426
427 Ok(())
428 } else {
429 Err(anyhow::anyhow!("TencentCloud TTS streaming: missing sink"))
430 }
431 }
432
433 async fn stop(&mut self) -> Result<()> {
434 if let Some(sink) = &mut self.sink {
435 let request = WebSocketRequest::complete_action(&self.session_id);
436 let data = serde_json::to_string(&request)?;
437 sink.send(Message::Text(data.into())).await?;
438 }
439
440 Ok(())
441 }
442}
443
444pub struct TencentCloudTtsClient;
445
446impl TencentCloudTtsClient {
447 pub fn create(streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
448 if streaming {
449 Ok(Box::new(StreamingClient::new(option.clone())))
450 } else {
451 Ok(Box::new(RealTimeClient::new(option.clone())))
452 }
453 }
454}