active_call/synthesis/
voiceapi.rs1use super::{SynthesisClient, SynthesisOption, SynthesisType};
2use crate::synthesis::SynthesisEvent;
3use anyhow::Result;
4use async_trait::async_trait;
5use futures::{
6 FutureExt, SinkExt, Stream, StreamExt,
7 stream::{self, BoxStream},
8};
9use serde::{Deserialize, Serialize};
10use std::sync::Arc;
11use tokio::sync::{Notify, mpsc};
12use tokio_stream::wrappers::UnboundedReceiverStream;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14use tracing::{debug, warn};
15
16#[derive(Debug)]
20pub struct VoiceApiTtsClient {
21 option: SynthesisOption,
22 tx: Option<mpsc::UnboundedSender<(String, Option<usize>, Option<SynthesisOption>)>>,
23}
24
25#[allow(dead_code)]
26#[derive(Debug, Serialize, Deserialize, Clone)]
28struct TtsRequest {
29 text: String,
30 sid: i32,
31 samplerate: i32,
32 speed: f32,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
37struct TtsResult {
38 progress: f32,
39 elapsed: String,
40 duration: String,
41 size: i32,
42}
43
44impl VoiceApiTtsClient {
45 pub fn create(_streaming: bool, option: &SynthesisOption) -> Result<Box<dyn SynthesisClient>> {
46 let client = Self::new(option.clone());
47 Ok(Box::new(client))
48 }
49 pub fn new(option: SynthesisOption) -> Self {
50 Self { option, tx: None }
51 }
52
53 fn construct_request_url(option: &SynthesisOption) -> String {
57 let endpoint = option
58 .endpoint
59 .clone()
60 .unwrap_or("ws://localhost:8080".to_string());
61
62 let ws_endpoint = if endpoint.starts_with("http") {
64 endpoint
65 .replace("http://", "ws://")
66 .replace("https://", "wss://")
67 } else {
68 endpoint
69 };
70 let chunk_size = 4 * 640;
71 format!("{}/tts?chunk_size={}&split=false", ws_endpoint, chunk_size)
72 }
73}
74
75fn ws_to_event_stream<T>(
79 ws_stream: T,
80 cmd_seq: Option<usize>,
81) -> BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>
82where
83 T: Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
84 + Send
85 + Unpin
86 + 'static,
87{
88 let notify = Arc::new(Notify::new());
89 let notify_clone = notify.clone();
90 ws_stream
91 .take_until(notify.notified_owned())
92 .filter_map(move |message| {
93 let notify = notify_clone.clone();
94 async move {
95 match message {
96 Ok(Message::Binary(data)) => {
97 Some((cmd_seq, Ok(SynthesisEvent::AudioChunk(data))))
98 }
99 Ok(Message::Text(text)) => {
100 match serde_json::from_str::<TtsResult>(&text) {
101 Ok(metadata) => {
102 debug!(
103 "Received metadata: progress={}, elapsed={}, duration={}, size={}",
104 metadata.progress,
105 metadata.elapsed,
106 metadata.duration,
107 metadata.size
108 );
109
110 if metadata.progress >= 1.0 {
111 notify.notify_one();
112 return Some((cmd_seq, Ok(SynthesisEvent::Finished)));
113 }
114 }
115 Err(e) => {
116 notify.notify_one();
117 warn!("Failed to parse metadata: {}", e);
118 return Some((
119 cmd_seq,
120 Err(anyhow::anyhow!(
121 "VoiceAPPI TTS error, Failed to parse metadata: {}, {}", text, e)),
122 ));
123 }
124 }
125 None
126 }
127 Ok(Message::Close(_)) => {
128 notify.notify_one();
129 warn!("VoiceAPI TTS closed by remote, {:?}", cmd_seq);
130 None
131 }
132 Err(e) => {
133 notify.notify_one();
134 Some((
135 cmd_seq,
136 Err(anyhow::anyhow!(
137 "VoiceAPI TTS websocket error: {:?}, {:?}",
138 cmd_seq,
139 e
140 )),
141 ))
142 }
143 _ => None,
144 }
145 }
146 })
147 .boxed()
148}
149#[async_trait]
150impl SynthesisClient for VoiceApiTtsClient {
151 fn provider(&self) -> SynthesisType {
152 SynthesisType::VoiceApi
153 }
154 async fn start(
155 &mut self,
156 ) -> Result<BoxStream<'static, (Option<usize>, Result<SynthesisEvent>)>> {
157 let (tx, rx) = mpsc::unbounded_channel();
158 self.tx = Some(tx);
159 let client_option = self.option.clone();
160 let max_concurrent_tasks = client_option.max_concurrent_tasks.unwrap_or(1);
161 let stream = UnboundedReceiverStream::new(rx).flat_map_unordered(
162 max_concurrent_tasks,
163 move |(text, cmd_seq, option)| {
164 let option = client_option.merge_with(option);
166 let url = Self::construct_request_url(&option);
167 connect_async(url)
168 .then(async move |res| match res {
169 Ok((mut ws_stream, _)) => {
170 ws_stream.send(Message::text(text)).await.ok();
171 ws_to_event_stream(ws_stream, cmd_seq)
172 }
173 Err(e) => {
174 warn!("VoiceAPI TTS websocket error: {}", e);
175 stream::empty().boxed()
176 }
177 })
178 .flatten_stream()
179 .boxed()
180 },
181 );
182 Ok(stream.boxed())
183 }
184
185 async fn synthesize(
186 &mut self,
187 text: &str,
188 cmd_seq: Option<usize>,
189 option: Option<SynthesisOption>,
190 ) -> Result<()> {
191 if let Some(tx) = &self.tx {
192 tx.send((text.to_string(), cmd_seq, option))?;
193 } else {
194 return Err(anyhow::anyhow!("VoiceAPI TTS: missing client sender"));
195 };
196 Ok(())
197 }
198
199 async fn stop(&mut self) -> Result<()> {
200 self.tx.take();
201 Ok(())
202 }
203}