active_call/synthesis/
voiceapi.rs

1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::SynthesisEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::{
6    FutureExt, SinkExt, Stream, StreamExt,
7    stream::{self, BoxStream},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::sync::{Notify, mpsc};
12use tokio_stream::wrappers::UnboundedReceiverStream;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14use tracing::{debug, warn};
15
16/// https://github.com/ruzhila/voiceapi
17/// A simple and clean voice transcription/synthesis API with sherpa-onnx
18///
19#[derive(Debug)]
20pub struct VoiceApiTtsClient {
21    option: SynthesisOption,
22    tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
23}
24
25#[allow(dead_code)]
26/// VoiceAPI TTS Request structure
27#[derive(Debug, Serialize, Deserialize, Clone)]
28struct TtsRequest {
29    text: String,
30    sid: i32,
31    samplerate: i32,
32    speed: f32,
33}
34
35/// VoiceAPI TTS metadata response
36#[derive(Debug, Serialize, Deserialize)]
37struct TtsResult {
38    progress: f32,
39    elapsed: String,
40    duration: String,
41    size: i32,
42}
43
44impl VoiceApiTtsClient {
45    pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
46        let client = Self::new(option.clone());
47        Ok(Box::new(client))
48    }
49    pub fn new(option: SynthesisOption) -> Self {
50        Self { option, tx: None }
51    }
52
53    // construct request url
54    // for non-streaming client, text is Some
55    // session_id is used for tencent cloud tts service, not the session_id of media session
56    fn construct_request_url(option: &SynthesisOption) -> String {
57        let endpoint = option
58            .endpoint
59            .clone()
60            .unwrap_or("ws://localhost:8080".to_string());
61
62        // Convert http endpoint to websocket if needed
63        let ws_endpoint = if endpoint.starts_with("http") {
64            endpoint
65                .replace("http://", "ws://")
66                .replace("https://", "wss://")
67        } else {
68            endpoint
69        };
70        let chunk_size = 4 * 640;
71        format!("{}/tts?chunk_size={}&split=false", ws_endpoint, chunk_size)
72    }
73}
74
75// convert websocket to event stream
76// text and cmd_seq and cache key are used for non-streaming mode (realtime client)
77// text is for debuging purpose
78fn ws_to_event_stream<T>(
79    ws_stream: T,
80    cmd_seq: Option<usize>,
81) -> BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>
82where
83    T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
84        + Send
85        + Unpin
86        + 'static,
87{
88    let notify = Arc::new(Notify::new());
89    let notify_clone = notify.clone();
90    ws_stream
91        .take_until(notify.notified_owned())
92        .filter_map(move |message| {
93            let notify = notify_clone.clone();
94            async move {
95                match message {
96                    Ok(Message::Binary(data)) => {
97                        Some((cmd_seq, Ok(SynthesisEvent::AudioChunk(data))))
98                    }
99                    Ok(Message::Text(text)) => {
100                        match serde_json::from_str::<TtsResult>(&text) {
101                            Ok(metadata) => {
102                                debug!(
103                                    "Received metadata: progress={}, elapsed={}, duration={}, size={}",
104                                    metadata.progress,
105                                    metadata.elapsed,
106                                    metadata.duration,
107                                    metadata.size
108                                );
109
110                                if metadata.progress >= 1.0 {
111                                    notify.notify_one();
112                                    return Some((cmd_seq, Ok(SynthesisEvent::Finished)));
113                                }
114                            }
115                            Err(e) => {
116                                notify.notify_one();
117                                warn!("Failed to parse metadata: {}", e);
118                                return Some((
119                                    cmd_seq,
120                                    Err(anyhow::anyhow!(
121                                        "VoiceAPPI TTS error, Failed to parse metadata: {}, {}", text, e)),
122                                ));
123                            }
124                        }
125                        None
126                    }
127                    Ok(Message::Close(_)) => {
128                        notify.notify_one();
129                        warn!("VoiceAPI TTS closed by remote, {:?}", cmd_seq);
130                        None
131                    }
132                    Err(e) => {
133                        notify.notify_one();
134                        Some((
135                            cmd_seq,
136                            Err(anyhow::anyhow!(
137                                "VoiceAPI TTS websocket error: {:?}, {:?}",
138                                cmd_seq,
139                                e
140                            )),
141                        ))
142                    }
143                    _ => None,
144                }
145            }
146        })
147        .boxed()
148}
149#[async_trait]
150impl SynthesisClient for VoiceApiTtsClient {
151    fn provider(&self) -> SynthesisType {
152        SynthesisType::VoiceApi
153    }
154    async fn start(
155        &mut self,
156    ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
157        let (tx, rx) = mpsc::unbounded_channel();
158        self.tx = Some(tx);
159        let client_option = self.option.clone();
160        let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
161        let stream = UnboundedReceiverStream::new(rx).flat_map_unordered(
162            max_concurrent_tasks,
163            move |(text, cmd_seq, option)| {
164                // each reequest have its own session_id
165                let option = client_option.merge_with(option);
166                let url = Self::construct_request_url(&option);
167                connect_async(url)
168                    .then(async move |res| match res {
169                        Ok((mut ws_stream, _)) => {
170                            ws_stream.send(Message::text(text)).await.ok();
171                            ws_to_event_stream(ws_stream, cmd_seq)
172                        }
173                        Err(e) => {
174                            warn!("VoiceAPI TTS websocket error: {}", e);
175                            stream::empty().boxed()
176                        }
177                    })
178                    .flatten_stream()
179                    .boxed()
180            },
181        );
182        Ok(stream.boxed())
183    }
184
185    async fn synthesize(
186        &mut self,
187        text: &str,
188        cmd_seq: Option<usize>,
189        option: Option<SynthesisOption>,
190    ) -> Result<()> {
191        if let Some(tx) = &self.tx {
192            tx.send((text.to_string(), cmd_seq, option))?;
193        } else {
194            return Err(anyhow::anyhow!("VoiceAPI TTS: missing client sender"));
195        };
196        Ok(())
197    }
198
199    async fn stop(&mut self) -> Result<()> {
200        self.tx.take();
201        Ok(())
202    }
203}