1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::{Subtitle, SynthesisEvent};
3use anyhow::Result;
4use async_trait::async_trait;
5use base64::{Engine, prelude::BASE64_STANDARD};
6use chrono::Duration;
7use futures::{
8 SinkExt, Stream, StreamExt, future,
9 stream::{self, BoxStream, SplitSink},
10};
11use ring::hmac;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14use tokio::{
15 net::TcpStream,
16 sync::{Notify, mpsc},
17};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19use tokio_tungstenite::{
20 MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
21};
22use tracing::{debug, warn};
23use urlencoding;
24use uuid::Uuid;
25
26const HOST: &str = "tts.cloud.tencent.com";
27const NON_STREAMING_PATH: &str = "/stream_ws";
28const STREAMING_PATH: &str = "/stream_wsv2";
29
30type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
31type WsSink = SplitSink<WsStream, Message>;
32
33#[derive(Debug, Serialize)]
34struct WebSocketRequest {
35 session_id: String,
36 message_id: String,
37 action: String,
38 data: String,
39}
40
41impl WebSocketRequest {
42 fn synthesis_action(session_id: &str, text: &str) -> Self {
43 let message_id = Uuid::new_v4().to_string();
44 Self {
45 session_id: session_id.to_string(),
46 message_id,
47 action: "ACTION_SYNTHESIS".to_string(),
48 data: text.to_string(),
49 }
50 }
51
52 fn complete_action(session_id: &str) -> Self {
53 let message_id = Uuid::new_v4().to_string();
54 Self {
55 session_id: session_id.to_string(),
56 message_id: message_id.to_string(),
57 action: "ACTION_COMPLETE".to_string(),
58 data: "".to_string(),
59 }
60 }
61}
62
63#[derive(Debug, Deserialize)]
64struct WebSocketResponse {
65 code: i32,
66 message: String,
67 r#final: i32,
68 result: WebSocketResult,
69 ready: u32,
70 heartbeat: u32,
71}
72
73#[derive(Debug, Deserialize)]
74struct WebSocketResult {
75 subtitles: Option<Vec<TencentSubtitle>>,
76}
77
78#[derive(Debug, Deserialize)]
79#[serde(rename_all = "PascalCase")]
80pub(super) struct TencentSubtitle {
81 text: String,
82 begin_time: u32,
83 end_time: u32,
84 begin_index: u32,
85 end_index: u32,
86}
87
88impl From<&TencentSubtitle> for Subtitle {
89 fn from(subtitle: &TencentSubtitle) -> Self {
90 Subtitle::new(
91 subtitle.text.clone(),
92 subtitle.begin_time,
93 subtitle.end_time,
94 subtitle.begin_index,
95 subtitle.end_index,
96 )
97 }
98}
99
100fn construct_request_url(option: &SynthesisOption, session_id: &str, text: Option<&str>) -> String {
104 let streaming = text.is_none();
105 let action = if !streaming {
106 "TextToStreamAudioWS"
107 } else {
108 "TextToStreamAudioWSv2"
109 };
110 let secret_id = option.secret_id.clone().unwrap_or_default();
111 let secret_key = option.secret_key.clone().unwrap_or_default();
112 let app_id = option.app_id.clone().unwrap_or_default();
113 let volume = option.volume.unwrap_or(0);
114 let speed = option.speed.unwrap_or(0.0);
115 let codec = option.codec.clone().unwrap_or_else(|| "pcm".to_string());
116 let sample_rate = option.samplerate.unwrap_or(16000);
117 let now = chrono::Utc::now();
118 let timestamp = now.timestamp();
119 let tomorrow = now + Duration::days(1);
120 let expired = tomorrow.timestamp();
121 let expired_str = expired.to_string();
122 let sample_rate_str = sample_rate.to_string();
123 let speed_str = speed.to_string();
124 let timestamp_str = timestamp.to_string();
125 let volume_str = volume.to_string();
126 let voice_type = option
127 .speaker
128 .clone()
129 .unwrap_or_else(|| "101001".to_string());
130 let mut query_params = vec![
131 ("Action", action),
132 ("AppId", &app_id),
133 ("SecretId", &secret_id),
134 ("Timestamp", ×tamp_str),
135 ("Expired", &expired_str),
136 ("SessionId", &session_id),
137 ("VoiceType", &voice_type),
138 ("Volume", &volume_str),
139 ("Speed", &speed_str),
140 ("SampleRate", &sample_rate_str),
141 ("Codec", &codec),
142 ("EnableSubtitle", "true"),
143 ];
144
145 if let Some(text) = text {
146 query_params.push(("Text", text));
147 }
148
149 query_params.sort_by(|a, b| a.0.cmp(b.0));
151
152 let query_string = query_params
154 .iter()
155 .map(|(k, v)| format!("{}={}", k, v))
156 .collect::<Vec<_>>()
157 .join("&");
158
159 let path = if streaming {
160 STREAMING_PATH
161 } else {
162 NON_STREAMING_PATH
163 };
164
165 let string_to_sign = format!("GET{}{}?{}", HOST, path, query_string);
166 let key = hmac::Key::new(hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, secret_key.as_bytes());
168 let tag = hmac::sign(&key, string_to_sign.as_bytes());
169 let signature: String = BASE64_STANDARD.encode(tag.as_ref());
170 let encoded_query_string = query_params
172 .iter()
173 .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
174 .collect::<Vec<_>>()
175 .join("&");
176
177 format!(
178 "wss://{}{}?{}&Signature={}",
179 HOST,
180 path,
181 encoded_query_string,
182 urlencoding::encode(&signature)
183 )
184}
185
186fn ws_to_event_stream<T>(
190 ws_stream: T,
191) -> impl Stream<Item = Result<SynthesisEvent>> + Send + 'static
192where
193 T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
194 + Send
195 + Unpin
196 + 'static,
197{
198 let notify = Arc::new(Notify::new());
199 let notify_clone = notify.clone();
200 ws_stream
201 .take_until(notify.notified_owned())
202 .filter_map(move |message| {
203 let notify = notify_clone.clone();
204 async move {
205 match message {
206 Ok(Message::Binary(data)) => Some(Ok(SynthesisEvent::AudioChunk(data))),
207 Ok(Message::Text(text)) => {
208 let response: WebSocketResponse =
209 serde_json::from_str(&text).expect("Tencent TTS API changed!");
210
211 if response.code != 0 {
212 notify.notify_one();
213 return Some(Err(anyhow::anyhow!(
214 "Tencent TTS error, code: {}, message: {}",
215 response.code,
216 response.message
217 )));
218 }
219
220 if response.heartbeat == 1 {
221 return None;
222 }
223
224 if let Some(subtitles) = response.result.subtitles {
225 let subtitles: Vec<Subtitle> =
226 subtitles.iter().map(Into::into).collect();
227 return Some(Ok(SynthesisEvent::Subtitles(subtitles)));
228 }
229
230 if response.r#final == 1 {
232 notify.notify_one();
233 return Some(Ok(SynthesisEvent::Finished));
234 }
235
236 None
237 }
238 Ok(Message::Close(_)) => {
239 notify.notify_one();
240 warn!("Tencent TTS closed by remote");
241 None
242 }
243 Err(e) => {
244 notify.notify_one();
245 Some(Err(anyhow::anyhow!("Tencent TTS websocket error: {:?}", e)))
246 }
247 _ => None,
248 }
249 }
250 })
251}
252
253pub struct RealTimeClient {
257 option: SynthesisOption,
258 tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
260}
261
262impl RealTimeClient {
263 fn new(option: SynthesisOption) -> Self {
264 Self { option, tx: None }
265 }
266}
267
268#[async_trait]
269impl SynthesisClient for RealTimeClient {
270 fn provider(&self) -> SynthesisType {
271 SynthesisType::TencentCloud
272 }
273
274 async fn start(
275 &mut self,
276 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
277 let (tx, rx) = mpsc::unbounded_channel();
280 self.tx = Some(tx);
281 let client_option = self.option.clone();
282 let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
283 let stream = UnboundedReceiverStream::new(rx)
284 .flat_map_unordered(max_concurrent_tasks, move |(text, cmd_seq, option)| {
285 let session_id = Uuid::new_v4().to_string();
287 let option = client_option.merge_with(option);
288 let url = construct_request_url(&option, &session_id, Some(&text));
289 stream::once(connect_async(url))
290 .flat_map(move |res| match res {
291 Ok((ws_stream, _)) => ws_to_event_stream(ws_stream).boxed(),
292 Err(e) => stream::once(future::ready(Err(e.into()))).boxed(),
293 })
294 .map(move |x| (cmd_seq, x))
295 .boxed()
296 })
297 .boxed();
298 Ok(stream)
299 }
300
301 async fn synthesize(
302 &mut self,
303 text: &str,
304 cmd_seq: Option<usize>,
305 option: Option<SynthesisOption>,
306 ) -> Result<()> {
307 if let Some(tx) = &self.tx {
308 tx.send((text.to_string(), cmd_seq, option))?;
309 } else {
310 return Err(anyhow::anyhow!("TencentCloud TTS: missing client sender"));
311 };
312
313 Ok(())
314 }
315
316 async fn stop(&mut self) -> Result<()> {
317 self.tx.take();
318 Ok(())
319 }
320}
321
322struct StreamingClient {
326 session_id: String,
327 option: SynthesisOption,
328 sink: Option<WsSink>,
329}
330
331impl StreamingClient {
332 pub fn new(option: SynthesisOption) -> Self {
333 let session_id = Uuid::new_v4().to_string();
334 Self {
335 session_id,
336 option,
337 sink: None,
338 }
339 }
340}
341
342impl StreamingClient {
343 async fn connect(&self) -> Result<WsStream> {
344 let url = construct_request_url(&self.option, &self.session_id, None);
345 let (mut ws_stream, _) = connect_async(url).await?;
346 while let Some(message) = ws_stream.next().await {
348 match message {
349 Ok(Message::Text(text)) => {
350 let response = serde_json::from_str::<WebSocketResponse>(&text)?;
351 if response.ready == 1 {
352 debug!("TencentCloud TTS streaming client connected");
353 return Ok(ws_stream);
354 }
355
356 if response.code != 0 {
357 return Err(anyhow::anyhow!(
358 "TencentCloud TTS streaming client connecting failed: code: {}, message: {}",
359 response.code,
360 response.message
361 ));
362 }
363 }
364 Ok(Message::Close { .. }) => {
365 return Err(anyhow::anyhow!(
366 "TencentCloud TTS streaming client connecting failed: closed by remote"
367 ));
368 }
369 Err(e) => {
370 return Err(anyhow::anyhow!(
371 "TencentCloud TTS streaming client connecting failed: websocket error: {}",
372 e
373 ));
374 }
375 _ => {}
376 }
377 }
378
379 Err(anyhow::anyhow!(
380 "TencentCloud TTS streaming client connecting failed"
381 ))
382 }
383}
384
385#[async_trait]
386impl SynthesisClient for StreamingClient {
387 fn provider(&self) -> SynthesisType {
388 SynthesisType::TencentCloud
389 }
390
391 async fn start(
392 &mut self,
393 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
394 let stream = self.connect().await?;
395 let (ws_sink, ws_stream) = stream.split();
396 self.sink = Some(ws_sink);
397 let stream = ws_to_event_stream(ws_stream)
398 .map(move |event| (None, event))
399 .boxed();
400 Ok(stream)
401 }
402
403 async fn synthesize(
404 &mut self,
405 text: &str,
406 _cmd_seq: Option<usize>,
407 _option: Option<SynthesisOption>,
408 ) -> Result<()> {
409 if let Some(sink) = &mut self.sink {
410 let request = WebSocketRequest::synthesis_action(&self.session_id, &text);
411 let data = serde_json::to_string(&request)?;
412 sink.send(Message::Text(data.into())).await?;
413
414 Ok(())
415 } else {
416 Err(anyhow::anyhow!("TencentCloud TTS streaming: missing sink"))
417 }
418 }
419
420 async fn stop(&mut self) -> Result<()> {
421 if let Some(sink) = &mut self.sink {
422 let request = WebSocketRequest::complete_action(&self.session_id);
423 let data = serde_json::to_string(&request)?;
424 sink.send(Message::Text(data.into())).await?;
425 }
426
427 Ok(())
428 }
429}
430
431pub struct TencentCloudTtsClient;
432
433impl TencentCloudTtsClient {
434 pub fn create(streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
435 if streaming {
436 Ok(Box::new(StreamingClient::new(option.clone())))
437 } else {
438 Ok(Box::new(RealTimeClient::new(option.clone())))
439 }
440 }
441}