active_call/synthesis/
msedge.rs1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::SynthesisEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use audio_codec::Resampler;
6use bytes::Bytes;
7use futures::{
8 StreamExt,
9 stream::{self, BoxStream},
10};
11use msedge_tts::tts::SpeechConfig;
12use msedge_tts::tts::client::connect_async;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::UnboundedReceiverStream;
15
16pub struct MsEdgeTtsClient {
17 option: SynthesisOption,
18 tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
19}
20
21impl MsEdgeTtsClient {
22 pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
23 Ok(Box::new(Self {
24 option: option.clone(),
25 tx: None,
26 }))
27 }
28
29 pub fn new(option: SynthesisOption) -> Self {
30 Self { option, tx: None }
31 }
32}
33
34#[async_trait]
35impl SynthesisClient for MsEdgeTtsClient {
36 fn provider(&self) -> SynthesisType {
37 SynthesisType::MsEdge
38 }
39
40 async fn start(
41 &mut self,
42 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
43 let (tx, rx) = mpsc::unbounded_channel();
44 self.tx = Some(tx);
45 let client_option = self.option.clone();
46
47 let stream = UnboundedReceiverStream::new(rx)
48 .then(move |(text, seq, option)| {
49 let current_option = client_option.merge_with(option);
50 let text = text.clone();
51
52 async move {
53 let result = async {
54 let mut tts = connect_async().await.map_err(|e| {
55 anyhow::anyhow!("Failed to connect to MsEdge TTS: {}", e)
56 })?;
57
58 let voice_name = current_option
59 .speaker
60 .clone()
61 .unwrap_or_else(|| "zh-CN-XiaoxiaoNeural".to_string());
62
63 let rate = current_option
64 .speed
65 .map(|s| ((s - 1.0) * 100.0) as i32)
66 .unwrap_or(0);
67
68 let config = SpeechConfig {
69 voice_name,
70 audio_format: "audio-24khz-48kbitrate-mono-mp3".to_string(),
71 pitch: 0,
72 rate,
73 volume: 0,
74 };
75
76 tts.synthesize(&text, &config)
77 .await
78 .map(|audio| (audio, current_option))
79 .map_err(|e| anyhow::anyhow!("MsEdge TTS error: {}", e))
80 }
81 .await;
82
83 match result {
84 Ok((audio, current_option)) => {
85 let audio_bytes = audio.audio_bytes;
86 let mut samples = Vec::new();
87 let mut sample_rate = 0;
88 let mut decoder = rmp3::Decoder::new(&audio_bytes);
89
90 while let Some(frame) = decoder.next() {
91 if let rmp3::Frame::Audio(audio) = frame {
92 if sample_rate == 0 {
93 sample_rate = audio.sample_rate();
94 }
95 samples.extend_from_slice(audio.samples());
96 }
97 }
98
99 if !samples.is_empty() {
100 let target_rate = current_option.samplerate.unwrap_or(16000) as u32;
101 if sample_rate > 0 && sample_rate != target_rate {
102 let mut resampler =
103 Resampler::new(sample_rate as usize, target_rate as usize);
104 samples = resampler.resample(&samples);
105 }
106
107 let mut pcm_bytes = Vec::with_capacity(samples.len() * 2);
108 for s in samples {
109 pcm_bytes.extend_from_slice(&s.to_le_bytes());
110 }
111
112 let events = vec![
113 Ok(SynthesisEvent::AudioChunk(Bytes::from(pcm_bytes))),
114 Ok(SynthesisEvent::Finished),
115 ];
116 (seq, stream::iter(events).boxed())
117 } else {
118 let events = vec![Ok(SynthesisEvent::Finished)];
119 (seq, stream::iter(events).boxed())
120 }
121 }
122 Err(e) => (seq, stream::once(async move { Err(e) }).boxed()),
123 }
124 }
125 })
126 .flat_map(|(seq, stream)| stream.map(move |event| (seq, event)))
127 .boxed();
128
129 Ok(stream)
130 }
131
132 async fn synthesize(
133 &mut self,
134 text: &str,
135 cmd_seq: Option<usize>,
136 option: Option<SynthesisOption>,
137 ) -> Result<()> {
138 if let Some(tx) = &self.tx {
139 tx.send((text.to_string(), cmd_seq, option))?;
140 Ok(())
141 } else {
142 Err(anyhow::anyhow!("MsEdge TTS: client not started"))
143 }
144 }
145
146 async fn stop(&mut self) -> Result<()> {
147 self.tx.take();
148 Ok(())
149 }
150}