active_call/synthesis/
tencent_cloud.rs

1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::{Subtitle, SynthesisEvent};
3use anyhow::Result;
4use async_trait::async_trait;
5use aws_lc_rs::hmac;
6use base64::{Engine, prelude::BASE64_STANDARD};
7use chrono::Duration;
8use futures::{
9    SinkExt, Stream, StreamExt, future,
10    stream::{self, BoxStream, SplitSink},
11};
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14use tokio::{
15    net::TcpStream,
16    sync::{Notify, mpsc},
17};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19use tokio_tungstenite::{
20    MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
21};
22use tracing::{debug, warn};
23use urlencoding;
24use uuid::Uuid;
25
26const HOST: &str = "tts.cloud.tencent.com";
27const NON_STREAMING_PATH: &str = "/stream_ws";
28const STREAMING_PATH: &str = "/stream_wsv2";
29
30type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
31type WsSink = SplitSink<WsStream, Message>;
32
33#[derive(Debug, Serialize)]
34struct WebSocketRequest {
35    session_id: String,
36    message_id: String,
37    action: String,
38    data: String,
39}
40
41impl WebSocketRequest {
42    fn synthesis_action(session_id: &str, text: &str) -> Self {
43        let message_id = Uuid::new_v4().to_string();
44        Self {
45            session_id: session_id.to_string(),
46            message_id,
47            action: "ACTION_SYNTHESIS".to_string(),
48            data: text.to_string(),
49        }
50    }
51
52    fn complete_action(session_id: &str) -> Self {
53        let message_id = Uuid::new_v4().to_string();
54        Self {
55            session_id: session_id.to_string(),
56            message_id: message_id.to_string(),
57            action: "ACTION_COMPLETE".to_string(),
58            data: "".to_string(),
59        }
60    }
61}
62
63#[derive(Debug, Deserialize)]
64struct WebSocketResponse {
65    code: i32,
66    message: String,
67    r#final: i32,
68    result: WebSocketResult,
69    ready: u32,
70    heartbeat: u32,
71}
72
73#[derive(Debug, Deserialize)]
74struct WebSocketResult {
75    subtitles: Option<Vec<TencentSubtitle>>,
76}
77
78#[derive(Debug, Deserialize)]
79#[serde(rename_all = "PascalCase")]
80pub(super) struct TencentSubtitle {
81    text: String,
82    begin_time: u32,
83    end_time: u32,
84    begin_index: u32,
85    end_index: u32,
86}
87
88impl From<&TencentSubtitle> for Subtitle {
89    fn from(subtitle: &TencentSubtitle) -> Self {
90        Subtitle::new(
91            subtitle.text.clone(),
92            subtitle.begin_time,
93            subtitle.end_time,
94            subtitle.begin_index,
95            subtitle.end_index,
96        )
97    }
98}
99
100// construct request url
101// for non-streaming client, text is Some
102// session_id is used for tencent cloud tts service, not the session_id of media session
103fn construct_request_url(option: &SynthesisOption, session_id: &str, text: Option<&str>) -> String {
104    let streaming = text.is_none();
105    let action = if !streaming {
106        "TextToStreamAudioWS"
107    } else {
108        "TextToStreamAudioWSv2"
109    };
110    let secret_id = option.secret_id.clone().unwrap_or_default();
111    let secret_key = option.secret_key.clone().unwrap_or_default();
112    let app_id = option.app_id.clone().unwrap_or_default();
113    let volume = option.volume.unwrap_or(0);
114    let speed = option.speed.unwrap_or(0.0);
115    let codec = option.codec.clone().unwrap_or_else(|| "pcm".to_string());
116    let sample_rate = option.samplerate.unwrap_or(16000);
117    let now = chrono::Utc::now();
118    let timestamp = now.timestamp();
119    let tomorrow = now + Duration::days(1);
120    let expired = tomorrow.timestamp();
121    let expired_str = expired.to_string();
122    let sample_rate_str = sample_rate.to_string();
123    let speed_str = speed.to_string();
124    let timestamp_str = timestamp.to_string();
125    let volume_str = volume.to_string();
126    let voice_type = option
127        .speaker
128        .clone()
129        .unwrap_or_else(|| "101001".to_string());
130    let mut query_params = vec![
131        ("Action", action),
132        ("AppId", &app_id),
133        ("SecretId", &secret_id),
134        ("Timestamp", &timestamp_str),
135        ("Expired", &expired_str),
136        ("SessionId", &session_id),
137        ("VoiceType", &voice_type),
138        ("Volume", &volume_str),
139        ("Speed", &speed_str),
140        ("SampleRate", &sample_rate_str),
141        ("Codec", &codec),
142        ("EnableSubtitle", "true"),
143    ];
144
145    if let Some(text) = text {
146        query_params.push(("Text", text));
147    }
148
149    // Sort query parameters by key
150    query_params.sort_by(|a, b| a.0.cmp(b.0));
151
152    // Build query string without URL encoding
153    let query_string = query_params
154        .iter()
155        .map(|(k, v)| format!("{}={}", k, v))
156        .collect::<Vec<_>>()
157        .join("&");
158
159    let path = if streaming {
160        STREAMING_PATH
161    } else {
162        NON_STREAMING_PATH
163    };
164
165    let string_to_sign = format!("GET{}{}?{}", HOST, path, query_string);
166    // Calculate signature using HMAC-SHA1
167    let key = hmac::Key::new(hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, secret_key.as_bytes());
168    let tag = hmac::sign(&key, string_to_sign.as_bytes());
169    let signature: String = BASE64_STANDARD.encode(tag.as_ref());
170    // URL encode parameters for final URL
171    let encoded_query_string = query_params
172        .iter()
173        .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
174        .collect::<Vec<_>>()
175        .join("&");
176
177    format!(
178        "wss://{}{}?{}&Signature={}",
179        HOST,
180        path,
181        encoded_query_string,
182        urlencoding::encode(&signature)
183    )
184}
185
186// convert websocket to event stream
187// text and cmd_seq and cache key are used for non-streaming mode (realtime client)
188// text is for debuging purpose
189fn ws_to_event_stream<T>(
190    ws_stream: T,
191) -> impl Stream<Item = Result<SynthesisEvent>> + Send + 'static
192where
193    T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
194        + Send
195        + Unpin
196        + 'static,
197{
198    let notify = Arc::new(Notify::new());
199    let notify_clone = notify.clone();
200    ws_stream
201        .take_until(notify.notified_owned())
202        .filter_map(move |message| {
203            let notify = notify_clone.clone();
204            async move {
205                match message {
206                    Ok(Message::Binary(data)) => Some(Ok(SynthesisEvent::AudioChunk(data))),
207                    Ok(Message::Text(text)) => {
208                        let response: WebSocketResponse =
209                            serde_json::from_str(&text).expect("Tencent TTS API changed!");
210
211                        if response.code != 0 {
212                            notify.notify_one();
213                            return Some(Err(anyhow::anyhow!(
214                                "Tencent TTS error, code: {}, message: {}",
215                                response.code,
216                                response.message
217                            )));
218                        }
219
220                        if response.heartbeat == 1 {
221                            return None;
222                        }
223
224                        if let Some(subtitles) = response.result.subtitles {
225                            let subtitles: Vec<Subtitle> =
226                                subtitles.iter().map(Into::into).collect();
227                            return Some(Ok(SynthesisEvent::Subtitles(subtitles)));
228                        }
229
230                        // final == 1 means the synthesis is finished, should close the websocket
231                        if response.r#final == 1 {
232                            notify.notify_one();
233                            return Some(Ok(SynthesisEvent::Finished));
234                        }
235
236                        None
237                    }
238                    Ok(Message::Close(_)) => {
239                        notify.notify_one();
240                        warn!("Tencent TTS closed by remote");
241                        None
242                    }
243                    Err(e) => {
244                        notify.notify_one();
245                        Some(Err(anyhow::anyhow!("Tencent TTS websocket error: {:?}", e)))
246                    }
247                    _ => None,
248                }
249            }
250        })
251}
252
253// tencent cloud realtime tts client, non-streaming
254// https://cloud.tencent.com/document/product/1073/94308
255// each tts command have one websocket connection, with different session_id
256pub struct RealTimeClient {
257    option: SynthesisOption,
258    //item: (text, option), drop tx if `end_of_stream`
259    tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
260}
261
262impl RealTimeClient {
263    fn new(option: SynthesisOption) -> Self {
264        Self { option, tx: None }
265    }
266}
267
268#[async_trait]
269impl SynthesisClient for RealTimeClient {
270    fn provider(&self) -> SynthesisType {
271        SynthesisType::TencentCloud
272    }
273
274    async fn start(
275        &mut self,
276    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
277        // Tencent cloud alow 10 - 20 concurrent websocket connections for default setting, dependent on voice type
278        // set the number more higher will lead to waiting for unordered results longer
279        let (tx, rx) = mpsc::unbounded_channel();
280        self.tx = Some(tx);
281        let client_option = self.option.clone();
282        let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
283        let stream = UnboundedReceiverStream::new(rx)
284            .flat_map_unordered(max_concurrent_tasks, move |(text, cmd_seq, option)| {
285                // each reequest have its own session_id
286                let session_id = Uuid::new_v4().to_string();
287                let option = client_option.merge_with(option);
288                let url = construct_request_url(&option, &session_id, Some(&text));
289                stream::once(connect_async(url))
290                    .flat_map(move |res| match res {
291                        Ok((ws_stream, _)) => ws_to_event_stream(ws_stream).boxed(),
292                        Err(e) => stream::once(future::ready(Err(e.into()))).boxed(),
293                    })
294                    .map(move |x| (cmd_seq, x))
295                    .boxed()
296            })
297            .boxed();
298        Ok(stream)
299    }
300
301    async fn synthesize(
302        &mut self,
303        text: &str,
304        cmd_seq: Option<usize>,
305        option: Option<SynthesisOption>,
306    ) -> Result<()> {
307        if let Some(tx) = &self.tx {
308            tx.send((text.to_string(), cmd_seq, option))?;
309        } else {
310            return Err(anyhow::anyhow!("TencentCloud TTS: missing client sender"));
311        };
312
313        Ok(())
314    }
315
316    async fn stop(&mut self) -> Result<()> {
317        self.tx.take();
318        Ok(())
319    }
320}
321
322// tencent cloud streaming tts client
323// https://cloud.tencent.com/document/product/1073/108595
324// all the tts commands with same play_id belong to same websocket connection
325struct StreamingClient {
326    session_id: String,
327    option: SynthesisOption,
328    sink: Option<WsSink>,
329}
330
331impl StreamingClient {
332    pub fn new(option: SynthesisOption) -> Self {
333        let session_id = Uuid::new_v4().to_string();
334        Self {
335            session_id,
336            option,
337            sink: None,
338        }
339    }
340}
341
342impl StreamingClient {
343    async fn connect(&self) -> Result<WsStream> {
344        let url = construct_request_url(&self.option, &self.session_id, None);
345        let (mut ws_stream, _) = connect_async(url).await?;
346        // waiting for ready = 1
347        while let Some(message) = ws_stream.next().await {
348            match message {
349                Ok(Message::Text(text)) => {
350                    let response = serde_json::from_str::<WebSocketResponse>(&text)?;
351                    if response.ready == 1 {
352                        debug!("TencentCloud TTS streaming client connected");
353                        return Ok(ws_stream);
354                    }
355
356                    if response.code != 0 {
357                        return Err(anyhow::anyhow!(
358                            "TencentCloud TTS streaming client connecting failed: code: {}, message: {}",
359                            response.code,
360                            response.message
361                        ));
362                    }
363                }
364                Ok(Message::Close { .. }) => {
365                    return Err(anyhow::anyhow!(
366                        "TencentCloud TTS streaming client connecting failed: closed by remote"
367                    ));
368                }
369                Err(e) => {
370                    return Err(anyhow::anyhow!(
371                        "TencentCloud TTS streaming client connecting failed: websocket error: {}",
372                        e
373                    ));
374                }
375                _ => {}
376            }
377        }
378
379        Err(anyhow::anyhow!(
380            "TencentCloud TTS streaming client connecting failed"
381        ))
382    }
383}
384
385#[async_trait]
386impl SynthesisClient for StreamingClient {
387    fn provider(&self) -> SynthesisType {
388        SynthesisType::TencentCloud
389    }
390
391    async fn start(
392        &mut self,
393    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
394        let stream = self.connect().await?;
395        let (ws_sink, ws_stream) = stream.split();
396        self.sink = Some(ws_sink);
397        let stream = ws_to_event_stream(ws_stream)
398            .map(move |event| (None, event))
399            .boxed();
400        Ok(stream)
401    }
402
403    async fn synthesize(
404        &mut self,
405        text: &str,
406        _cmd_seq: Option<usize>,
407        _option: Option<SynthesisOption>,
408    ) -> Result<()> {
409        if let Some(sink) = &mut self.sink {
410            let request = WebSocketRequest::synthesis_action(&self.session_id, &text);
411            let data = serde_json::to_string(&request)?;
412            sink.send(Message::Text(data.into())).await?;
413
414            Ok(())
415        } else {
416            Err(anyhow::anyhow!("TencentCloud TTS streaming: missing sink"))
417        }
418    }
419
420    async fn stop(&mut self) -> Result<()> {
421        if let Some(sink) = &mut self.sink {
422            let request = WebSocketRequest::complete_action(&self.session_id);
423            let data = serde_json::to_string(&request)?;
424            sink.send(Message::Text(data.into())).await?;
425        }
426
427        Ok(())
428    }
429}
430
431pub struct TencentCloudTtsClient;
432
433impl TencentCloudTtsClient {
434    pub fn create(streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
435        if streaming {
436            Ok(Box::new(StreamingClient::new(option.clone())))
437        } else {
438            Ok(Box::new(RealTimeClient::new(option.clone())))
439        }
440    }
441}