use crate::client::{Client, WebSocket};
use crate::protocol::tts as p;
use anyhow::Result;
#[derive(Debug)]
pub struct TtsStream {
ws: WebSocket,
ready: p::Ready,
}
#[derive(Debug)]
pub struct TtsStreamSender {
ws: crate::client::WebSocketSender,
}
#[derive(Debug)]
pub struct TtsStreamReceiver {
ws: crate::client::WebSocketReceiver,
}
impl TtsStream {
pub fn split(self) -> (TtsStreamSender, TtsStreamReceiver) {
use futures_util::StreamExt;
let (ws_tx, ws_rx) = self.ws.split();
(TtsStreamSender { ws: ws_tx }, TtsStreamReceiver { ws: ws_rx })
}
pub async fn new(setup: p::Setup, client: &Client) -> Result<Self> {
use futures_util::SinkExt;
let mut ws = client.ws_connect("speech/tts").await?;
let setup = serde_json::to_string(&p::Request::Setup(setup))?;
ws.send(tokio_tungstenite::tungstenite::Message::Text(setup.into())).await?;
let first_msg = crate::client::next_message(&mut ws).await?;
let first_msg = match first_msg {
None => anyhow::bail!("connection closed by server"),
Some(m) => m,
};
let first_msg: p::Response = serde_json::from_str(&first_msg)?;
let ready = match first_msg {
p::Response::Ready(ready) => ready,
p::Response::Error { code, message, .. } => {
anyhow::bail!("error from server {code:?}: {message}")
}
_ => anyhow::bail!("unexpected first message from server: {:?}", first_msg),
};
Ok(Self { ws, ready })
}
pub async fn send_eos(&mut self) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream { client_req_id: None };
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_text(&mut self, text: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Text(p::Text { text: text.to_string(), client_req_id: None });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
match &msg {
p::Response::EndOfStream { .. } => return Ok(None),
p::Response::Error { code, message, .. } => {
anyhow::bail!("error from server {code:?}: {message}")
}
_ => {}
}
Ok(Some(msg))
}
pub fn sample_rate(&self) -> u32 {
self.ready.sample_rate
}
pub fn frame_size(&self) -> u32 {
self.ready.frame_size
}
pub fn audio_stream_names(&self) -> &[String] {
&self.ready.audio_stream_names
}
pub fn text_stream_names(&self) -> &[String] {
&self.ready.text_stream_names
}
pub fn request_id(&self) -> &str {
&self.ready.request_id
}
}
#[derive(Debug, Clone)]
pub struct TtsResult {
raw_data: Vec<u8>,
request_id: String,
sample_rate: u32,
text_with_timestamps: Vec<crate::TextWithTimestamps>,
}
impl TtsResult {
pub fn raw_data(&self) -> &[u8] {
&self.raw_data
}
pub fn request_id(&self) -> &str {
&self.request_id
}
pub fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub fn text_with_timestamps(&self) -> &[crate::TextWithTimestamps] {
&self.text_with_timestamps
}
}
impl TtsStreamSender {
pub async fn send_eos(&mut self) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream { client_req_id: None };
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_text(&mut self, text: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Text(p::Text { text: text.to_string(), client_req_id: None });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
}
impl TtsStreamReceiver {
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message_receiver(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
match &msg {
p::Response::EndOfStream { .. } => return Ok(None),
p::Response::Error { code, message, .. } => {
anyhow::bail!("error from server {code:?}: {message}")
}
_ => {}
}
Ok(Some(msg))
}
}
pub async fn tts(text: &str, setup: p::Setup, client: &Client) -> Result<TtsResult> {
let mut stream = TtsStream::new(setup, client).await?;
stream.send_text(text).await?;
stream.send_eos().await?;
let mut all_raw = vec![];
let mut text_with_timestamps = vec![];
while let Some(data) = stream.next_message().await? {
match data {
p::Response::Audio(audio) => {
let raw = audio.raw_audio()?;
all_raw.push(raw)
}
p::Response::Text(text) => {
text_with_timestamps.push(crate::TextWithTimestamps {
text: text.text.clone(),
start_s: text.start_s,
stop_s: text.stop_s,
});
}
_ => {}
}
}
let raw_data = all_raw.concat();
Ok(TtsResult {
raw_data,
text_with_timestamps,
request_id: stream.request_id().to_string(),
sample_rate: stream.sample_rate(),
})
}
#[derive(Debug)]
pub struct TtsMultiplexStream {
ws: WebSocket,
}
#[derive(Debug)]
pub struct TtsMultiplexSender {
ws: crate::client::WebSocketSender,
}
#[derive(Debug)]
pub struct TtsMultiplexReceiver {
ws: crate::client::WebSocketReceiver,
}
impl TtsMultiplexStream {
pub async fn connect(client: &Client) -> Result<Self> {
let ws = client.ws_connect("speech/tts").await?;
Ok(Self { ws })
}
pub async fn send_setup(&mut self, setup: p::Setup) -> Result<()> {
use futures_util::SinkExt;
if setup.client_req_id.is_none() {
anyhow::bail!("client_req_id must be set for multiplexed requests");
}
if setup.close_ws_on_eos != Some(false) {
anyhow::bail!("close_ws_on_eos must be set to false for multiplexed requests");
}
let req = serde_json::to_string(&p::Request::Setup(setup))?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_text(&mut self, text: &str, client_req_id: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Text(p::Text {
text: text.to_string(),
client_req_id: Some(client_req_id.to_string()),
});
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_eos(&mut self, client_req_id: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream { client_req_id: Some(client_req_id.to_string()) };
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
Ok(Some(msg))
}
pub fn split(self) -> (TtsMultiplexSender, TtsMultiplexReceiver) {
use futures_util::StreamExt;
let (ws_tx, ws_rx) = self.ws.split();
(TtsMultiplexSender { ws: ws_tx }, TtsMultiplexReceiver { ws: ws_rx })
}
}
impl TtsMultiplexSender {
pub async fn send_setup(&mut self, setup: p::Setup) -> Result<()> {
use futures_util::SinkExt;
if setup.client_req_id.is_none() {
anyhow::bail!("client_req_id must be set for multiplexed requests");
}
if setup.close_ws_on_eos != Some(false) {
anyhow::bail!("close_ws_on_eos must be set to false for multiplexed requests");
}
let req = serde_json::to_string(&p::Request::Setup(setup))?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_text(&mut self, text: &str, client_req_id: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Text(p::Text {
text: text.to_string(),
client_req_id: Some(client_req_id.to_string()),
});
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_eos(&mut self, client_req_id: &str) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream { client_req_id: Some(client_req_id.to_string()) };
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
}
impl TtsMultiplexReceiver {
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message_receiver(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
Ok(Some(msg))
}
}
pub async fn tts_stream(setup: p::Setup, client: &Client) -> Result<TtsStream> {
TtsStream::new(setup, client).await
}