active_call/synthesis/
tencent_cloud_basic.rs

1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::{SynthesisEvent, tencent_cloud::TencentSubtitle};
3use anyhow::Result;
4use async_trait::async_trait;
5use aws_lc_rs::hmac;
6use base64::{Engine, prelude::BASE64_STANDARD};
7use bytes::Bytes;
8use futures::{
9    FutureExt, StreamExt, future,
10    stream::{self, BoxStream},
11};
12use rand::Rng;
13use serde::Deserialize;
14use tokio::sync::mpsc;
15use tokio_stream::wrappers::UnboundedReceiverStream;
16use unic_emoji::char::is_emoji;
17use urlencoding;
18use uuid::Uuid;
19
20const HOST: &str = "tts.tencentcloudapi.com";
21const PATH: &str = "/";
22
23#[derive(Debug, Deserialize)]
24struct Response {
25    #[serde(rename = "Response")]
26    response: ResponseData,
27}
28
29#[derive(Debug, Deserialize)]
30#[serde(rename_all = "PascalCase")]
31struct ResponseData {
32    #[serde(default)]
33    audio: String,
34    #[serde(default)]
35    subtitles: Vec<TencentSubtitle>,
36    error: Option<TencentError>,
37}
38
39#[derive(Debug, Deserialize)]
40#[serde(rename_all = "PascalCase")]
41struct TencentError {
42    code: String,
43    message: String,
44}
45
46// tencent cloud will crash if text contains emoji
47// Only remove non-ASCII emoji characters. Keep all ASCII (digits, letters, punctuation),
48// since some ASCII (e.g., '0'..'9', '#', '*') are marked with the Unicode Emoji property
49// due to keycap sequences but are safe and expected in text.
50pub fn strip_emoji_chars(text: &str) -> String {
51    text.chars()
52        .filter(|&c| c.is_ascii() || !is_emoji(c))
53        .collect()
54}
55
56// construct request url
57// for non-streaming client, text is Some
58// session_id is used for tencent cloud tts service, not the session_id of media session
59fn construct_request_url(option: &SynthesisOption, session_id: &str, text: &str) -> String {
60    let timestamp = chrono::Utc::now().timestamp().to_string();
61    let nonce = rand::thread_rng().r#gen::<u64>().to_string();
62    let session_id = session_id.to_string();
63    let secret_id = option.secret_id.clone().unwrap_or_default();
64    let secret_key = option.secret_key.clone().unwrap_or_default();
65    let volume = option.volume.unwrap_or(0).to_string();
66    let speed = option.speed.unwrap_or(0.0).to_string();
67    let voice_type = option
68        .speaker
69        .as_ref()
70        .map(String::as_str)
71        .unwrap_or("501004");
72    let sample_rate = option.samplerate.unwrap_or(16000).to_string();
73    let codec = option.codec.as_ref().map(String::as_str).unwrap_or("pcm");
74    let mut query_params = vec![
75        ("Action", "TextToVoice"),
76        ("Timestamp", &timestamp),
77        ("Nonce", &nonce),
78        ("SecretId", &secret_id),
79        ("Version", "2019-08-23"),
80        ("Text", &text),
81        ("SessionId", &session_id),
82        ("Volume", &volume),
83        ("Speed", &speed),
84        ("VoiceType", &voice_type),
85        ("SampleRate", &sample_rate),
86        ("Codec", &codec),
87        ("EnableSubtitle", "true"),
88    ];
89
90    // Sort query parameters by key
91    query_params.sort_by_key(|(k, _)| *k);
92    // Build query string without URL encoding
93    let query_string = query_params
94        .iter()
95        .map(|(k, v)| format!("{}={}", k, v))
96        .collect::<Vec<_>>()
97        .join("&");
98    let string_to_sign = format!("GET{}{}?{}", HOST, PATH, query_string);
99
100    // Calculate signature using HMAC-SHA1
101    let key = hmac::Key::new(hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, secret_key.as_bytes());
102    let tag = hmac::sign(&key, string_to_sign.as_bytes());
103    let signature: String = BASE64_STANDARD.encode(tag.as_ref());
104    query_params.push(("Signature", &signature));
105
106    // URL encode parameters for final URL
107    let encoded_query_string = query_params
108        .iter()
109        .map(|(k, v)| format!("{}={}", k, urlencoding::encode(v)))
110        .collect::<Vec<_>>()
111        .join("&");
112
113    format!("https://{}{}?{}", HOST, PATH, encoded_query_string)
114}
115
116// tencent cloud realtime tts client, non-streaming
117// https://cloud.tencent.com/document/product/1073/94308
118// each tts command have one websocket connection, with different session_id
119
120#[async_trait]
121impl SynthesisClient for TencentCloudTtsBasicClient {
122    fn provider(&self) -> SynthesisType {
123        SynthesisType::Other("tencent_basic".to_string())
124    }
125
126    async fn start(
127        &mut self,
128    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
129        // Tencent cloud alow 10 - 20 concurrent websocket connections for default setting, dependent on voice type
130        // set the number more higher will lead to waiting for unordered results longer
131        let (tx, rx) = mpsc::unbounded_channel();
132        self.tx = Some(tx);
133        let client_option = self.option.clone();
134        let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
135        let stream = UnboundedReceiverStream::new(rx)
136            .flat_map_unordered(max_concurrent_tasks, move |(text, seq, option)| {
137                // each reequest have its own session_id
138                let session_id = Uuid::new_v4().to_string();
139                let option = client_option.merge_with(option);
140                let url = construct_request_url(&option, &session_id, &text);
141                // request tencent cloud tts
142                let fut = reqwest::get(url).then(async |res| {
143                    let resp = res?.json::<Response>().await?;
144                    if let Some(error) = resp.response.error {
145                        return Err(anyhow::anyhow!(
146                            "Tencent TTS error, code: {}, message: {}",
147                            error.code,
148                            error.message
149                        ));
150                    }
151                    let audio = BASE64_STANDARD.decode(resp.response.audio)?;
152                    Ok((audio, resp.response.subtitles))
153                });
154
155                // convert result to events
156                stream::once(fut)
157                    .flat_map(|res| match res {
158                        Ok((audio, subtitles)) => {
159                            let mut events = Vec::new();
160                            events.push(Ok(SynthesisEvent::AudioChunk(Bytes::from(audio))));
161                            if !subtitles.is_empty() {
162                                events.push(Ok(SynthesisEvent::Subtitles(
163                                    subtitles.iter().map(Into::into).collect(),
164                                )));
165                            }
166                            events.push(Ok(SynthesisEvent::Finished));
167                            stream::iter(events).boxed()
168                        }
169                        Err(e) => stream::once(future::ready(Err(e))).boxed(),
170                    })
171                    .map(move |x| (seq, x))
172                    .boxed()
173            })
174            .boxed();
175        Ok(stream)
176    }
177
178    async fn synthesize(
179        &mut self,
180        text: &str,
181        cmd_seq: Option<usize>,
182        option: Option<SynthesisOption>,
183    ) -> Result<()> {
184        if let Some(tx) = &self.tx {
185            let text = strip_emoji_chars(text);
186            tx.send((text, cmd_seq, option))?;
187        } else {
188            return Err(anyhow::anyhow!("TencentCloud TTS: missing client sender"));
189        };
190
191        Ok(())
192    }
193
194    async fn stop(&mut self) -> Result<()> {
195        self.tx.take();
196        Ok(())
197    }
198}
199
200// tencent basic tts
201// https://cloud.tencent.com/document/product/1073/37995
202pub struct TencentCloudTtsBasicClient {
203    option: SynthesisOption,
204    //item: (text, option), drop tx if `end_of_stream`
205    tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
206}
207
208impl TencentCloudTtsBasicClient {
209    pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
210        Ok(Box::new(Self {
211            option: option.clone(),
212            tx: None,
213        }))
214    }
215}