active_call/synthesis/
tencent_cloud.rs

1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::{Subtitle, SynthesisEvent};
3use anyhow::Result;
4use async_trait::async_trait;
5use base64::{Engine, prelude::BASE64_STANDARD};
6use chrono::Duration;
7use futures::{
8    SinkExt, Stream, StreamExt, future,
9    stream::{self, BoxStream, SplitSink},
10};
11use ring::hmac;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14use tokio::{
15    net::TcpStream,
16    sync::{Notify, mpsc},
17};
18use tokio_stream::wrappers::UnboundedReceiverStream;
19use tokio_tungstenite::{
20    MaybeTlsStream, WebSocketStream, connect_async, tungstenite::protocol::Message,
21};
22use tracing::{debug, warn};
23use unic_emoji::char::is_emoji;
24use urlencoding;
25use uuid::Uuid;
26
27const HOST: &str = "tts.cloud.tencent.com";
28const NON_STREAMING_PATH: &str = "/stream_ws";
29const STREAMING_PATH: &str = "/stream_wsv2";
30
31type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
32type WsSink = SplitSink<WsStream, Message>;
33
34#[derive(Debug, Serialize)]
35struct WebSocketRequest {
36    session_id: String,
37    message_id: String,
38    action: String,
39    data: String,
40}
41
42impl WebSocketRequest {
43    fn synthesis_action(session_id: &str, text: &str) -> Self {
44        let message_id = Uuid::new_v4().to_string();
45        Self {
46            session_id: session_id.to_string(),
47            message_id,
48            action: "ACTION_SYNTHESIS".to_string(),
49            data: text.to_string(),
50        }
51    }
52
53    fn complete_action(session_id: &str) -> Self {
54        let message_id = Uuid::new_v4().to_string();
55        Self {
56            session_id: session_id.to_string(),
57            message_id: message_id.to_string(),
58            action: "ACTION_COMPLETE".to_string(),
59            data: "".to_string(),
60        }
61    }
62}
63
64#[derive(Debug, Deserialize)]
65struct WebSocketResponse {
66    code: i32,
67    message: String,
68    r#final: i32,
69    result: WebSocketResult,
70    ready: u32,
71    heartbeat: u32,
72}
73
74#[derive(Debug, Deserialize)]
75struct WebSocketResult {
76    subtitles: Option<Vec<TencentSubtitle>>,
77}
78
79#[derive(Debug, Deserialize)]
80#[serde(rename_all = "PascalCase")]
81pub(super) struct TencentSubtitle {
82    text: String,
83    begin_time: u32,
84    end_time: u32,
85    begin_index: u32,
86    end_index: u32,
87}
88
89impl From<&TencentSubtitle> for Subtitle {
90    fn from(subtitle: &TencentSubtitle) -> Self {
91        Subtitle::new(
92            subtitle.text.clone(),
93            subtitle.begin_time,
94            subtitle.end_time,
95            subtitle.begin_index,
96            subtitle.end_index,
97        )
98    }
99}
100
101// tencent cloud will crash if text contains emoji
102// Only remove non-ASCII emoji characters. Keep all ASCII (digits, letters, punctuation),
103// since some ASCII (e.g., '0'..'9', '#', '*') are marked with the Unicode Emoji property
104// due to keycap sequences but are safe and expected in text.
105pub fn strip_emoji_chars(text: &str) -> String {
106    text.chars()
107        .filter(|&c| c.is_ascii() || !is_emoji(c))
108        .collect()
109}
110
111// construct request url
112// for non-streaming client, text is Some
113// session_id is used for tencent cloud tts service, not the session_id of media session
114fn construct_request_url(option: &SynthesisOption, session_id: &str, text: Option<&str>) -> String {
115    let streaming = text.is_none();
116    let action = if !streaming {
117        "TextToStreamAudioWS"
118    } else {
119        "TextToStreamAudioWSv2"
120    };
121    let secret_id = option.secret_id.clone().unwrap_or_default();
122    let secret_key = option.secret_key.clone().unwrap_or_default();
123    let app_id = option.app_id.clone().unwrap_or_default();
124    let volume = option.volume.unwrap_or(0);
125    let speed = option.speed.unwrap_or(0.0);
126    let codec = option.codec.clone().unwrap_or_else(|| "pcm".to_string());
127    let sample_rate = option.samplerate.unwrap_or(16000);
128    let now = chrono::Utc::now();
129    let timestamp = now.timestamp();
130    let tomorrow = now + Duration::days(1);
131    let expired = tomorrow.timestamp();
132    let expired_str = expired.to_string();
133    let sample_rate_str = sample_rate.to_string();
134    let speed_str = speed.to_string();
135    let timestamp_str = timestamp.to_string();
136    let volume_str = volume.to_string();
137    let voice_type = option
138        .speaker
139        .clone()
140        .unwrap_or_else(|| "101001".to_string());
141    let mut query_params = vec![
142        ("Action", action),
143        ("AppId", &app_id),
144        ("SecretId", &secret_id),
145        ("Timestamp", &timestamp_str),
146        ("Expired", &expired_str),
147        ("SessionId", &session_id),
148        ("VoiceType", &voice_type),
149        ("Volume", &volume_str),
150        ("Speed", &speed_str),
151        ("SampleRate", &sample_rate_str),
152        ("Codec", &codec),
153        ("EnableSubtitle", "true"),
154    ];
155
156    if let Some(text) = text {
157        query_params.push(("Text", text));
158    }
159
160    // Sort query parameters by key
161    query_params.sort_by(|a, b| a.0.cmp(b.0));
162
163    // Build query string without URL encoding
164    let query_string = query_params
165        .iter()
166        .map(|(k, v)| format!("{}={}", k, v))
167        .collect::<Vec<_>>()
168        .join("&");
169
170    let path = if streaming {
171        STREAMING_PATH
172    } else {
173        NON_STREAMING_PATH
174    };
175
176    let string_to_sign = format!("GET{}{}?{}", HOST, path, query_string);
177    // Calculate signature using HMAC-SHA1
178    let key = hmac::Key::new(hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, secret_key.as_bytes());
179    let tag = hmac::sign(&key, string_to_sign.as_bytes());
180    let signature: String = BASE64_STANDARD.encode(tag.as_ref());
181    // URL encode parameters for final URL
182    let encoded_query_string = query_params
183        .iter()
184        .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
185        .collect::<Vec<_>>()
186        .join("&");
187
188    format!(
189        "wss://{}{}?{}&Signature={}",
190        HOST,
191        path,
192        encoded_query_string,
193        urlencoding::encode(&signature)
194    )
195}
196
197// convert websocket to event stream
198// text and cmd_seq and cache key are used for non-streaming mode (realtime client)
199// text is for debuging purpose
200fn ws_to_event_stream<T>(
201    ws_stream: T,
202) -> impl Stream<Item = Result<SynthesisEvent>> + Send + 'static
203where
204    T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
205        + Send
206        + Unpin
207        + 'static,
208{
209    let notify = Arc::new(Notify::new());
210    let notify_clone = notify.clone();
211    ws_stream
212        .take_until(notify.notified_owned())
213        .filter_map(move |message| {
214            let notify = notify_clone.clone();
215            async move {
216                match message {
217                    Ok(Message::Binary(data)) => Some(Ok(SynthesisEvent::AudioChunk(data))),
218                    Ok(Message::Text(text)) => {
219                        let response: WebSocketResponse =
220                            serde_json::from_str(&text).expect("Tencent TTS API changed!");
221
222                        if response.code != 0 {
223                            notify.notify_one();
224                            return Some(Err(anyhow::anyhow!(
225                                "Tencent TTS error, code: {}, message: {}",
226                                response.code,
227                                response.message
228                            )));
229                        }
230
231                        if response.heartbeat == 1 {
232                            return None;
233                        }
234
235                        if let Some(subtitles) = response.result.subtitles {
236                            let subtitles: Vec<Subtitle> =
237                                subtitles.iter().map(Into::into).collect();
238                            return Some(Ok(SynthesisEvent::Subtitles(subtitles)));
239                        }
240
241                        // final == 1 means the synthesis is finished, should close the websocket
242                        if response.r#final == 1 {
243                            notify.notify_one();
244                            return Some(Ok(SynthesisEvent::Finished));
245                        }
246
247                        None
248                    }
249                    Ok(Message::Close(_)) => {
250                        notify.notify_one();
251                        warn!("Tencent TTS closed by remote");
252                        None
253                    }
254                    Err(e) => {
255                        notify.notify_one();
256                        Some(Err(anyhow::anyhow!("Tencent TTS websocket error: {:?}", e)))
257                    }
258                    _ => None,
259                }
260            }
261        })
262}
263
264// tencent cloud realtime tts client, non-streaming
265// https://cloud.tencent.com/document/product/1073/94308
266// each tts command have one websocket connection, with different session_id
267pub struct RealTimeClient {
268    option: SynthesisOption,
269    //item: (text, option), drop tx if `end_of_stream`
270    tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
271}
272
273impl RealTimeClient {
274    fn new(option: SynthesisOption) -> Self {
275        Self { option, tx: None }
276    }
277}
278
279#[async_trait]
280impl SynthesisClient for RealTimeClient {
281    fn provider(&self) -> SynthesisType {
282        SynthesisType::TencentCloud
283    }
284
285    async fn start(
286        &mut self,
287    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
288        // Tencent cloud alow 10 - 20 concurrent websocket connections for default setting, dependent on voice type
289        // set the number more higher will lead to waiting for unordered results longer
290        let (tx, rx) = mpsc::unbounded_channel();
291        self.tx = Some(tx);
292        let client_option = self.option.clone();
293        let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
294        let stream = UnboundedReceiverStream::new(rx)
295            .flat_map_unordered(max_concurrent_tasks, move |(text, cmd_seq, option)| {
296                // each reequest have its own session_id
297                let session_id = Uuid::new_v4().to_string();
298                let option = client_option.merge_with(option);
299                let url = construct_request_url(&option, &session_id, Some(&text));
300                stream::once(connect_async(url))
301                    .flat_map(move |res| match res {
302                        Ok((ws_stream, _)) => ws_to_event_stream(ws_stream).boxed(),
303                        Err(e) => stream::once(future::ready(Err(e.into()))).boxed(),
304                    })
305                    .map(move |x| (cmd_seq, x))
306                    .boxed()
307            })
308            .boxed();
309        Ok(stream)
310    }
311
312    async fn synthesize(
313        &mut self,
314        text: &str,
315        cmd_seq: Option<usize>,
316        option: Option<SynthesisOption>,
317    ) -> Result<()> {
318        if let Some(tx) = &self.tx {
319            let text = strip_emoji_chars(text);
320            tx.send((text, cmd_seq, option))?;
321        } else {
322            return Err(anyhow::anyhow!("TencentCloud TTS: missing client sender"));
323        };
324
325        Ok(())
326    }
327
328    async fn stop(&mut self) -> Result<()> {
329        self.tx.take();
330        Ok(())
331    }
332}
333
334// tencent cloud streaming tts client
335// https://cloud.tencent.com/document/product/1073/108595
336// all the tts commands with same play_id belong to same websocket connection
337struct StreamingClient {
338    session_id: String,
339    option: SynthesisOption,
340    sink: Option<WsSink>,
341}
342
343impl StreamingClient {
344    pub fn new(option: SynthesisOption) -> Self {
345        let session_id = Uuid::new_v4().to_string();
346        Self {
347            session_id,
348            option,
349            sink: None,
350        }
351    }
352}
353
354impl StreamingClient {
355    async fn connect(&self) -> Result<WsStream> {
356        let url = construct_request_url(&self.option, &self.session_id, None);
357        let (mut ws_stream, _) = connect_async(url).await?;
358        // waiting for ready = 1
359        while let Some(message) = ws_stream.next().await {
360            match message {
361                Ok(Message::Text(text)) => {
362                    let response = serde_json::from_str::<WebSocketResponse>(&text)?;
363                    if response.ready == 1 {
364                        debug!("TencentCloud TTS streaming client connected");
365                        return Ok(ws_stream);
366                    }
367
368                    if response.code != 0 {
369                        return Err(anyhow::anyhow!(
370                            "TencentCloud TTS streaming client connecting failed: code: {}, message: {}",
371                            response.code,
372                            response.message
373                        ));
374                    }
375                }
376                Ok(Message::Close { .. }) => {
377                    return Err(anyhow::anyhow!(
378                        "TencentCloud TTS streaming client connecting failed: closed by remote"
379                    ));
380                }
381                Err(e) => {
382                    return Err(anyhow::anyhow!(
383                        "TencentCloud TTS streaming client connecting failed: websocket error: {}",
384                        e
385                    ));
386                }
387                _ => {}
388            }
389        }
390
391        Err(anyhow::anyhow!(
392            "TencentCloud TTS streaming client connecting failed"
393        ))
394    }
395}
396
397#[async_trait]
398impl SynthesisClient for StreamingClient {
399    fn provider(&self) -> SynthesisType {
400        SynthesisType::TencentCloud
401    }
402
403    async fn start(
404        &mut self,
405    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
406        let stream = self.connect().await?;
407        let (ws_sink, ws_stream) = stream.split();
408        self.sink = Some(ws_sink);
409        let stream = ws_to_event_stream(ws_stream)
410            .map(move |event| (None, event))
411            .boxed();
412        Ok(stream)
413    }
414
415    async fn synthesize(
416        &mut self,
417        text: &str,
418        _cmd_seq: Option<usize>,
419        _option: Option<SynthesisOption>,
420    ) -> Result<()> {
421        if let Some(sink) = &mut self.sink {
422            let text = strip_emoji_chars(text);
423            let request = WebSocketRequest::synthesis_action(&self.session_id, &text);
424            let data = serde_json::to_string(&request)?;
425            sink.send(Message::Text(data.into())).await?;
426
427            Ok(())
428        } else {
429            Err(anyhow::anyhow!("TencentCloud TTS streaming: missing sink"))
430        }
431    }
432
433    async fn stop(&mut self) -> Result<()> {
434        if let Some(sink) = &mut self.sink {
435            let request = WebSocketRequest::complete_action(&self.session_id);
436            let data = serde_json::to_string(&request)?;
437            sink.send(Message::Text(data.into())).await?;
438        }
439
440        Ok(())
441    }
442}
443
444pub struct TencentCloudTtsClient;
445
446impl TencentCloudTtsClient {
447    pub fn create(streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
448        if streaming {
449            Ok(Box::new(StreamingClient::new(option.clone())))
450        } else {
451            Ok(Box::new(RealTimeClient::new(option.clone())))
452        }
453    }
454}