use crate::client::{Client, WebSocket};
use crate::protocol::stt as p;
use anyhow::Result;
#[derive(Debug)]
pub struct SttStream {
ws: WebSocket,
ready: p::Ready,
}
#[derive(Debug)]
pub struct SttStreamSender {
ws: crate::client::WebSocketSender,
}
#[derive(Debug)]
pub struct SttStreamReceiver {
ws: crate::client::WebSocketReceiver,
}
impl SttStream {
pub fn split(self) -> (SttStreamSender, SttStreamReceiver) {
use futures_util::StreamExt;
let (ws_tx, ws_rx) = self.ws.split();
(SttStreamSender { ws: ws_tx }, SttStreamReceiver { ws: ws_rx })
}
pub async fn new(setup: p::Setup, client: &Client) -> Result<Self> {
use futures_util::SinkExt;
let mut ws = client.ws_connect("speech/asr").await?;
let setup = serde_json::to_string(&p::Request::Setup(setup))?;
ws.send(tokio_tungstenite::tungstenite::Message::Text(setup.into())).await?;
let first_msg = crate::client::next_message(&mut ws).await?;
let first_msg = match first_msg {
None => anyhow::bail!("connection closed by server"),
Some(m) => m,
};
let first_msg: p::Response = serde_json::from_str(&first_msg)?;
let ready = match first_msg {
p::Response::Ready(ready) => ready,
p::Response::Error { code, message } => {
anyhow::bail!("error from server {code}: {message}")
}
_ => anyhow::bail!("unexpected first message from server: {:?}", first_msg),
};
Ok(Self { ws, ready })
}
pub async fn send_eos(&mut self) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream;
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_audio(&mut self, audio: Vec<u8>) -> Result<()> {
use base64::prelude::*;
use futures_util::SinkExt;
let audio = base64::engine::general_purpose::STANDARD.encode(&audio);
let req = p::Request::Audio(p::Audio { audio });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_audio_base64(&mut self, audio_b64: String) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Audio(p::Audio { audio: audio_b64 });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
match &msg {
p::Response::EndOfStream => return Ok(None),
p::Response::Error { code, message } => {
anyhow::bail!("error from server {code}: {message}")
}
_ => {}
}
Ok(Some(msg))
}
pub fn sample_rate(&self) -> u32 {
self.ready.sample_rate
}
pub fn frame_size(&self) -> u32 {
self.ready.frame_size
}
pub fn text_stream_names(&self) -> &[String] {
&self.ready.text_stream_names
}
pub fn request_id(&self) -> &str {
&self.ready.request_id
}
}
#[derive(Debug, Clone)]
pub struct SttResult {
request_id: String,
sample_rate: u32,
text_with_timestamps: Vec<crate::TextWithTimestamps>,
}
impl SttResult {
pub fn request_id(&self) -> &str {
&self.request_id
}
pub fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub fn text_with_timestamps(&self) -> &[crate::TextWithTimestamps] {
&self.text_with_timestamps
}
}
pub async fn stt(audio: Vec<u8>, setup: p::Setup, client: &Client) -> Result<SttResult> {
let mut stream = SttStream::new(setup, client).await?;
for audio in audio.chunks(1920) {
stream.send_audio(audio.to_vec()).await?;
}
stream.send_eos().await?;
let mut text_with_timestamps = vec![];
while let Some(data) = stream.next_message().await? {
match data {
p::Response::Text(text) => {
let twt = crate::TextWithTimestamps {
text: text.text,
start_s: text.start_s,
stop_s: text.start_s, };
text_with_timestamps.push(twt)
}
p::Response::EndText(e) => {
text_with_timestamps.last_mut().iter_mut().for_each(|v| v.stop_s = e.stop_s)
}
_ => {}
}
}
Ok(SttResult {
text_with_timestamps,
request_id: stream.request_id().to_string(),
sample_rate: stream.sample_rate(),
})
}
impl SttStreamSender {
pub async fn send_eos(&mut self) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::EndOfStream;
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_audio(&mut self, audio: Vec<u8>) -> Result<()> {
use base64::prelude::*;
use futures_util::SinkExt;
let audio = base64::engine::general_purpose::STANDARD.encode(&audio);
let req = p::Request::Audio(p::Audio { audio });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
pub async fn send_audio_base64(&mut self, audio_b64: String) -> Result<()> {
use futures_util::SinkExt;
let req = p::Request::Audio(p::Audio { audio: audio_b64 });
let req = serde_json::to_string(&req)?;
self.ws.send(tokio_tungstenite::tungstenite::Message::Text(req.into())).await?;
Ok(())
}
}
impl SttStreamReceiver {
pub async fn next_message(&mut self) -> Result<Option<p::Response>> {
let msg = crate::client::next_message_receiver(&mut self.ws).await?;
let msg = match msg {
None => return Ok(None),
Some(m) => m,
};
let msg: p::Response = serde_json::from_str(&msg)?;
match &msg {
p::Response::EndOfStream => return Ok(None),
p::Response::Error { code, message } => {
anyhow::bail!("error from server {code}: {message}")
}
_ => {}
}
Ok(Some(msg))
}
}
pub async fn stt_stream(setup: p::Setup, client: &Client) -> Result<SttStream> {
SttStream::new(setup, client).await
}