#![allow(clippy::manual_map)]
use crate::api::grpc::google::cloud::speechtotext::v1::{
speech_client::SpeechClient, streaming_recognize_request::StreamingRequest,
LongRunningRecognizeRequest, LongRunningRecognizeResponse, RecognizeRequest, RecognizeResponse,
StreamingRecognitionConfig, StreamingRecognizeRequest, StreamingRecognizeResponse,
};
use crate::api::grpc::google::longrunning::{
operation::Result as OperationResult, operations_client::OperationsClient, GetOperationRequest,
Operation,
};
use crate::common::{get_token, new_grpc_channel, new_interceptor, TokenInterceptor};
use crate::errors::{Error, Result};
use async_stream::try_stream;
use futures_core::stream::Stream;
use log::*;
use prost::Message;
use std::io::Cursor;
use std::result::Result as StdResult;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::InterceptedService;
use tonic::Response as TonicResponse;
use tonic::Status as TonicStatus;
use tonic::{transport::Channel, Response as GrpcResponse, Streaming};
const GRPC_API_DOMAIN: &str = "speech.googleapis.com";
const GRPC_API_URL: &str = "https://speech.googleapis.com";
#[derive(Debug)]
pub struct Recognizer {
speech_client: SpeechClient<InterceptedService<Channel, TokenInterceptor>>,
operations_client: Option<OperationsClient<InterceptedService<Channel, TokenInterceptor>>>,
audio_sender: Option<mpsc::Sender<StreamingRecognizeRequest>>,
audio_receiver: Option<mpsc::Receiver<StreamingRecognizeRequest>>,
result_sender: Option<mpsc::Sender<StreamingRecognizeResponse>>,
}
impl Recognizer {
pub async fn create_streaming_recognizer(
google_credentials: impl AsRef<str>,
config: StreamingRecognitionConfig,
buffer_size: Option<usize>,
) -> Result<Self> {
let channel = new_grpc_channel(GRPC_API_DOMAIN, GRPC_API_URL, None).await?;
let token_header_val = get_token(google_credentials)?;
let speech_client =
SpeechClient::with_interceptor(channel, new_interceptor(token_header_val));
let (audio_sender, audio_receiver) =
mpsc::channel::<StreamingRecognizeRequest>(buffer_size.unwrap_or(1000));
let streaming_config = StreamingRecognizeRequest {
streaming_request: Some(StreamingRequest::StreamingConfig(config)),
};
audio_sender.send(streaming_config).await?;
Ok(Recognizer {
speech_client,
operations_client: None,
audio_sender: Some(audio_sender),
audio_receiver: Some(audio_receiver),
result_sender: None,
})
}
pub async fn create_asynchronous_recognizer(
google_credentials: impl AsRef<str>,
) -> Result<Self> {
let channel = new_grpc_channel(GRPC_API_DOMAIN, GRPC_API_URL, None).await?;
let token_header_val = get_token(google_credentials)?;
let speech_client = SpeechClient::with_interceptor(
channel.clone(),
new_interceptor(token_header_val.clone()),
);
let operations_client =
OperationsClient::with_interceptor(channel, new_interceptor(token_header_val));
Ok(Recognizer {
speech_client,
operations_client: Some(operations_client),
audio_sender: None,
audio_receiver: None,
result_sender: None,
})
}
pub async fn create_synchronous_recognizer(
google_credentials: impl AsRef<str>,
) -> Result<Self> {
let channel = new_grpc_channel(GRPC_API_DOMAIN, GRPC_API_URL, None).await?;
let token_header_val = get_token(google_credentials)?;
let speech_client =
SpeechClient::with_interceptor(channel, new_interceptor(token_header_val));
Ok(Recognizer {
speech_client,
operations_client: None,
audio_sender: None,
audio_receiver: None,
result_sender: None,
})
}
pub fn get_audio_sink(&mut self) -> Option<mpsc::Sender<StreamingRecognizeRequest>> {
if let Some(audio_sender) = &self.audio_sender {
Some(audio_sender.clone())
} else {
None
}
}
pub fn take_audio_sink(&mut self) -> Option<mpsc::Sender<StreamingRecognizeRequest>> {
if let Some(audio_sender) = self.audio_sender.take() {
Some(audio_sender)
} else {
None
}
}
pub fn drop_audio_sink(&mut self) {
self.audio_sender.take();
}
pub fn get_streaming_result_receiver(
&mut self,
buffer_size: Option<usize>,
) -> mpsc::Receiver<StreamingRecognizeResponse> {
let (result_sender, result_receiver) =
mpsc::channel::<StreamingRecognizeResponse>(buffer_size.unwrap_or(1000));
self.result_sender = Some(result_sender);
result_receiver
}
pub fn streaming_request_from_bytes(audio_bytes: Vec<u8>) -> StreamingRecognizeRequest {
StreamingRecognizeRequest {
streaming_request: Some(StreamingRequest::AudioContent(audio_bytes)),
}
}
#[allow(unreachable_code)]
pub async fn streaming_recognize_async_stream(
&mut self,
) -> impl Stream<Item = Result<StreamingRecognizeResponse>> + '_ {
try_stream! {
if let Some(audio_receiver) = self.audio_receiver.take() {
let streaming_recognize_result: StdResult<
TonicResponse<Streaming<StreamingRecognizeResponse>>,
TonicStatus,
> = self.speech_client.streaming_recognize(ReceiverStream::new(audio_receiver)).await;
let mut response_stream: Streaming<StreamingRecognizeResponse> =
streaming_recognize_result?.into_inner();
trace!("streaming_recognize: entering loop");
while let Some(streaming_recognize_response) = response_stream.message().await? {
yield streaming_recognize_response;
}
trace!("streaming_recognize: leaving loop");
}
}
}
pub async fn streaming_recognize(&mut self) -> Result<()> {
if let Some(audio_receiver) = self.audio_receiver.take() {
let streaming_recognize_result: StdResult<
tonic::Response<Streaming<StreamingRecognizeResponse>>,
tonic::Status,
> = self
.speech_client
.streaming_recognize(ReceiverStream::new(audio_receiver))
.await;
let mut response_stream: Streaming<StreamingRecognizeResponse> =
streaming_recognize_result?.into_inner();
while let Some(streaming_recognize_response) = response_stream.message().await? {
if let Some(result_sender) = &self.result_sender {
result_sender.send(streaming_recognize_response).await?;
}
}
}
Ok(())
}
pub async fn long_running_recognize(
&mut self,
request: LongRunningRecognizeRequest,
) -> Result<GrpcResponse<Operation>> {
Ok(self.speech_client.long_running_recognize(request).await?)
}
pub async fn long_running_wait(
&mut self,
operation: Operation,
check_interval_ms: Option<u64>,
) -> Result<Option<LongRunningRecognizeResponse>> {
let operation_req = GetOperationRequest {
name: operation.name.clone(),
};
loop {
if let Some(oper_client) = &mut self.operations_client {
let tonic_response: TonicResponse<Operation> =
oper_client.get_operation(operation_req.clone()).await?;
let operation = tonic_response.into_inner();
if operation.done {
return if let Some(operation_result) = operation.result {
match operation_result {
OperationResult::Error(rpc_status) => {
error!("Recognizer.long_running_wait rpc error {:?}", rpc_status);
Err(Error::new_with_code(
rpc_status.message,
rpc_status.code.to_string(),
))
}
OperationResult::Response(any_response) => {
let lrr_response: LongRunningRecognizeResponse =
LongRunningRecognizeResponse::decode(&mut Cursor::new(
any_response.value,
))?;
Ok(Some(lrr_response))
}
}
} else {
Ok(None)
};
} else {
sleep(Duration::from_millis(check_interval_ms.unwrap_or(1000))).await;
}
}
}
}
pub async fn recognize(&mut self, request: RecognizeRequest) -> Result<RecognizeResponse> {
let tonic_response: TonicResponse<RecognizeResponse> =
self.speech_client.recognize(request).await?;
Ok(tonic_response.into_inner())
}
}