Skip to main content

active_call/synthesis/
supertonic.rs

1use crate::offline::get_offline_models;
2use crate::synthesis::{SynthesisClient, SynthesisEvent, SynthesisOption, SynthesisType};
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use audio_codec::Resampler;
6use bytes::Bytes;
7use futures::stream::BoxStream;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::UnboundedReceiverStream;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, warn};
12
13pub struct SupertonicTtsClient {
14    voice_style: String,
15    speed: f32,
16    target_rate: i32,
17    tx: Option<mpsc::UnboundedSender<(Option<usize>, Result<SynthesisEvent>)>>,
18    token: CancellationToken,
19}
20
21impl SupertonicTtsClient {
22    pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
23        let voice_style = option.speaker.clone().unwrap_or_else(|| "M1".to_string());
24        let speed = option.speed.unwrap_or(1.0);
25        let target_rate = option.samplerate.unwrap_or(16000);
26
27        Ok(Box::new(Self {
28            voice_style,
29            speed,
30            target_rate,
31            tx: None,
32            token: CancellationToken::new(),
33        }))
34    }
35
36    fn ensure_models_initialized() -> Result<()> {
37        if get_offline_models().is_none() {
38            anyhow::bail!(
39                "Offline models not initialized. Please call init_offline_models() first."
40            );
41        }
42        Ok(())
43    }
44
45    async fn synthesize_text(&self, text: String, cmd_seq: Option<usize>) -> Result<()> {
46        let Some(tx) = self.tx.as_ref() else {
47            return Ok(());
48        };
49
50        let models =
51            get_offline_models().ok_or_else(|| anyhow!("offline models not initialized"))?;
52
53        let voice_style = self.voice_style.clone();
54        let speed = self.speed;
55        let target_rate = self.target_rate;
56        let tx_clone = tx.clone();
57        // Supertonic: en is hardcoded for now, or detect from text?
58        let language = "en".to_string();
59
60        let tts_arc = models.get_supertonic().await?;
61
62        // Run synthesis in blocking task
63        tokio::task::spawn_blocking(move || {
64            // Use try_write to avoid potential panics or deadlocks;
65            // blocking_write can panic/deadlock when the lock is held by an async task.
66            let mut guard = match tts_arc.try_write() {
67                Ok(g) => g,
68                Err(_) => {
69                    warn!("Supertonic TTS write lock unavailable, skipping synthesis");
70                    let _ = tx_clone.send((cmd_seq, Err(anyhow!("TTS write lock unavailable"))));
71                    return;
72                }
73            };
74
75            if let Some(tts) = guard.as_mut() {
76                debug!(
77                    text = %text,
78                    voice = %voice_style,
79                    speed = speed,
80                    target_rate = target_rate,
81                    "Calling Supertonic TTS synthesis"
82                );
83
84                match tts.synthesize(&text, &language, Some(&voice_style), Some(speed)) {
85                    Ok(samples) => {
86                        if !samples.is_empty() {
87                            let mut samples_i16: Vec<i16> = samples
88                                .iter()
89                                .map(|&s| (s * 32768.0).max(-32768.0).min(32767.0) as i16)
90                                .collect();
91
92                            // Resample if needed
93                            if tts.sample_rate() != target_rate {
94                                let mut resampler = Resampler::new(
95                                    tts.sample_rate() as usize,
96                                    target_rate as usize,
97                                );
98                                samples_i16 = resampler.resample(&samples_i16);
99                            }
100
101                            // Convert i16 samples to PCM bytes
102                            let mut bytes = Vec::with_capacity(samples_i16.len() * 2);
103                            for s in samples_i16 {
104                                bytes.extend_from_slice(&s.to_le_bytes());
105                            }
106
107                            // Send AudioChunk
108                            let _ = tx_clone.send((
109                                cmd_seq,
110                                Ok(SynthesisEvent::AudioChunk(Bytes::from(bytes))),
111                            ));
112
113                            // Send Finished
114                            let _ = tx_clone.send((cmd_seq, Ok(SynthesisEvent::Finished)));
115                        } else {
116                            warn!("Supertonic produced empty audio");
117                            let _ = tx_clone.send((cmd_seq, Ok(SynthesisEvent::Finished)));
118                        }
119                    }
120                    Err(e) => {
121                        warn!(error = %e, "Supertonic inference failed");
122                        let _ = tx_clone.send((cmd_seq, Err(anyhow!("Synthesis failed: {}", e))));
123                    }
124                }
125            } else {
126                warn!("Supertonic TTS not initialized");
127                let _ = tx_clone.send((cmd_seq, Err(anyhow!("TTS not initialized"))));
128            }
129        })
130        .await
131        .map_err(|e| anyhow!("task join error: {}", e))?;
132
133        Ok(())
134    }
135}
136
137#[async_trait]
138impl SynthesisClient for SupertonicTtsClient {
139    fn provider(&self) -> SynthesisType {
140        SynthesisType::Supertonic
141    }
142
143    async fn start(
144        &mut self,
145    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
146        Self::ensure_models_initialized()?;
147
148        let (tx, rx) = mpsc::unbounded_channel();
149        self.tx = Some(tx);
150
151        // Initialize TTS if needed
152        let models =
153            get_offline_models().ok_or_else(|| anyhow!("offline models not initialized"))?;
154        models.init_supertonic().await?;
155
156        debug!(
157            "SupertonicTtsClient started with voice: {}",
158            self.voice_style
159        );
160
161        Ok(Box::pin(UnboundedReceiverStream::new(rx)))
162    }
163
164    async fn synthesize(
165        &mut self,
166        text: &str,
167        cmd_seq: Option<usize>,
168        _option: Option<SynthesisOption>,
169    ) -> Result<()> {
170        self.synthesize_text(text.to_string(), cmd_seq).await
171    }
172
173    async fn stop(&mut self) -> Result<()> {
174        self.token.cancel();
175        self.tx = None;
176        Ok(())
177    }
178}