active_call/synthesis/
supertonic.rs1use crate::offline::get_offline_models;
2use crate::synthesis::{SynthesisClient, SynthesisEvent, SynthesisOption, SynthesisType};
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use audio_codec::Resampler;
6use bytes::Bytes;
7use futures::stream::BoxStream;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, warn};
12
13pub struct SupertonicTtsClient {
14 voice_style: String,
15 speed: f32,
16 target_rate: i32,
17 tx: Option<mpsc::UnboundedSender<(Option<usize>, Result<SynthesisEvent>)>>,
18 token: CancellationToken,
19}
20
21impl SupertonicTtsClient {
22 pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
23 let voice_style = option.speaker.clone().unwrap_or_else(|| "M1".to_string());
24 let speed = option.speed.unwrap_or(1.0);
25 let target_rate = option.samplerate.unwrap_or(16000);
26
27 Ok(Box::new(Self {
28 voice_style,
29 speed,
30 target_rate,
31 tx: None,
32 token: CancellationToken::new(),
33 }))
34 }
35
36 fn ensure_models_initialized() -> Result<()> {
37 if get_offline_models().is_none() {
38 anyhow::bail!(
39 "Offline models not initialized. Please call init_offline_models() first."
40 );
41 }
42 Ok(())
43 }
44
45 async fn synthesize_text(&self, text: String, cmd_seq: Option<usize>) -> Result<()> {
46 let Some(tx) = self.tx.as_ref() else {
47 return Ok(());
48 };
49
50 let models =
51 get_offline_models().ok_or_else(|| anyhow!("offline models not initialized"))?;
52
53 let voice_style = self.voice_style.clone();
54 let speed = self.speed;
55 let target_rate = self.target_rate;
56 let tx_clone = tx.clone();
57 let language = "en".to_string();
59
60 let tts_arc = models.get_supertonic().await?;
61
62 tokio::task::spawn_blocking(move || {
64 let mut guard = match tts_arc.try_write() {
67 Ok(g) => g,
68 Err(_) => {
69 warn!("Supertonic TTS write lock unavailable, skipping synthesis");
70 let _ = tx_clone.send((cmd_seq, Err(anyhow!("TTS write lock unavailable"))));
71 return;
72 }
73 };
74
75 if let Some(tts) = guard.as_mut() {
76 debug!(
77 text = %text,
78 voice = %voice_style,
79 speed = speed,
80 target_rate = target_rate,
81 "Calling Supertonic TTS synthesis"
82 );
83
84 match tts.synthesize(&text, &language, Some(&voice_style), Some(speed)) {
85 Ok(samples) => {
86 if !samples.is_empty() {
87 let mut samples_i16: Vec<i16> = samples
88 .iter()
89 .map(|&s| (s * 32768.0).max(-32768.0).min(32767.0) as i16)
90 .collect();
91
92 if tts.sample_rate() != target_rate {
94 let mut resampler = Resampler::new(
95 tts.sample_rate() as usize,
96 target_rate as usize,
97 );
98 samples_i16 = resampler.resample(&samples_i16);
99 }
100
101 let mut bytes = Vec::with_capacity(samples_i16.len() * 2);
103 for s in samples_i16 {
104 bytes.extend_from_slice(&s.to_le_bytes());
105 }
106
107 let _ = tx_clone.send((
109 cmd_seq,
110 Ok(SynthesisEvent::AudioChunk(Bytes::from(bytes))),
111 ));
112
113 let _ = tx_clone.send((cmd_seq, Ok(SynthesisEvent::Finished)));
115 } else {
116 warn!("Supertonic produced empty audio");
117 let _ = tx_clone.send((cmd_seq, Ok(SynthesisEvent::Finished)));
118 }
119 }
120 Err(e) => {
121 warn!(error = %e, "Supertonic inference failed");
122 let _ = tx_clone.send((cmd_seq, Err(anyhow!("Synthesis failed: {}", e))));
123 }
124 }
125 } else {
126 warn!("Supertonic TTS not initialized");
127 let _ = tx_clone.send((cmd_seq, Err(anyhow!("TTS not initialized"))));
128 }
129 })
130 .await
131 .map_err(|e| anyhow!("task join error: {}", e))?;
132
133 Ok(())
134 }
135}
136
137#[async_trait]
138impl SynthesisClient for SupertonicTtsClient {
139 fn provider(&self) -> SynthesisType {
140 SynthesisType::Supertonic
141 }
142
143 async fn start(
144 &mut self,
145 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
146 Self::ensure_models_initialized()?;
147
148 let (tx, rx) = mpsc::unbounded_channel();
149 self.tx = Some(tx);
150
151 let models =
153 get_offline_models().ok_or_else(|| anyhow!("offline models not initialized"))?;
154 models.init_supertonic().await?;
155
156 debug!(
157 "SupertonicTtsClient started with voice: {}",
158 self.voice_style
159 );
160
161 Ok(Box::pin(UnboundedReceiverStream::new(rx)))
162 }
163
164 async fn synthesize(
165 &mut self,
166 text: &str,
167 cmd_seq: Option<usize>,
168 _option: Option<SynthesisOption>,
169 ) -> Result<()> {
170 self.synthesize_text(text.to_string(), cmd_seq).await
171 }
172
173 async fn stop(&mut self) -> Result<()> {
174 self.token.cancel();
175 self.tx = None;
176 Ok(())
177 }
178}