active_call/synthesis/
msedge.rs

1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::SynthesisEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use audio_codec::Resampler;
6use bytes::Bytes;
7use futures::{
8    StreamExt,
9    stream::{self, BoxStream},
10};
11use msedge_tts::tts::SpeechConfig;
12use msedge_tts::tts::client::connect_async;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::UnboundedReceiverStream;
15
16pub struct MsEdgeTtsClient {
17    option: SynthesisOption,
18    tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
19}
20
21impl MsEdgeTtsClient {
22    pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
23        Ok(Box::new(Self {
24            option: option.clone(),
25            tx: None,
26        }))
27    }
28
29    pub fn new(option: SynthesisOption) -> Self {
30        Self { option, tx: None }
31    }
32}
33
34#[async_trait]
35impl SynthesisClient for MsEdgeTtsClient {
36    fn provider(&self) -> SynthesisType {
37        SynthesisType::MsEdge
38    }
39
40    async fn start(
41        &mut self,
42    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
43        let (tx, rx) = mpsc::unbounded_channel();
44        self.tx = Some(tx);
45        let client_option = self.option.clone();
46
47        let stream = UnboundedReceiverStream::new(rx)
48            .then(move |(text, seq, option)| {
49                let current_option = client_option.merge_with(option);
50                let text = text.clone();
51
52                async move {
53                    let result = async {
54                        let mut tts = connect_async().await.map_err(|e| {
55                            anyhow::anyhow!("Failed to connect to MsEdge TTS: {}", e)
56                        })?;
57
58                        let voice_name = current_option
59                            .speaker
60                            .clone()
61                            .unwrap_or_else(|| "zh-CN-XiaoxiaoNeural".to_string());
62
63                        let rate = current_option
64                            .speed
65                            .map(|s| ((s - 1.0) * 100.0) as i32)
66                            .unwrap_or(0);
67
68                        let config = SpeechConfig {
69                            voice_name,
70                            audio_format: "audio-24khz-48kbitrate-mono-mp3".to_string(),
71                            pitch: 0,
72                            rate,
73                            volume: 0,
74                        };
75
76                        tts.synthesize(&text, &config)
77                            .await
78                            .map(|audio| (audio, current_option))
79                            .map_err(|e| anyhow::anyhow!("MsEdge TTS error: {}", e))
80                    }
81                    .await;
82
83                    match result {
84                        Ok((audio, current_option)) => {
85                            let audio_bytes = audio.audio_bytes;
86                            let mut samples = Vec::new();
87                            let mut sample_rate = 0;
88                            let mut decoder = rmp3::Decoder::new(&audio_bytes);
89
90                            while let Some(frame) = decoder.next() {
91                                if let rmp3::Frame::Audio(audio) = frame {
92                                    if sample_rate == 0 {
93                                        sample_rate = audio.sample_rate();
94                                    }
95                                    samples.extend_from_slice(audio.samples());
96                                }
97                            }
98
99                            if !samples.is_empty() {
100                                let target_rate = current_option.samplerate.unwrap_or(16000) as u32;
101                                if sample_rate > 0 && sample_rate != target_rate {
102                                    let mut resampler =
103                                        Resampler::new(sample_rate as usize, target_rate as usize);
104                                    samples = resampler.resample(&samples);
105                                }
106
107                                let mut pcm_bytes = Vec::with_capacity(samples.len() * 2);
108                                for s in samples {
109                                    pcm_bytes.extend_from_slice(&s.to_le_bytes());
110                                }
111
112                                let events = vec![
113                                    Ok(SynthesisEvent::AudioChunk(Bytes::from(pcm_bytes))),
114                                    Ok(SynthesisEvent::Finished),
115                                ];
116                                (seq, stream::iter(events).boxed())
117                            } else {
118                                let events = vec![Ok(SynthesisEvent::Finished)];
119                                (seq, stream::iter(events).boxed())
120                            }
121                        }
122                        Err(e) => (seq, stream::once(async move { Err(e) }).boxed()),
123                    }
124                }
125            })
126            .flat_map(|(seq, stream)| stream.map(move |event| (seq, event)))
127            .boxed();
128
129        Ok(stream)
130    }
131
132    async fn synthesize(
133        &mut self,
134        text: &str,
135        cmd_seq: Option<usize>,
136        option: Option<SynthesisOption>,
137    ) -> Result<()> {
138        if let Some(tx) = &self.tx {
139            tx.send((text.to_string(), cmd_seq, option))?;
140            Ok(())
141        } else {
142            Err(anyhow::anyhow!("MsEdge TTS: client not started"))
143        }
144    }
145
146    async fn stop(&mut self) -> Result<()> {
147        self.tx.take();
148        Ok(())
149    }
150}