use std::ops::Deref;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, mpsc};
use crate::stt::{Context, EventfulContext, Phrase, Recognizer};
use crate::Result;
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
pub struct UnicastSubscriber {
rx: mpsc::Receiver<Phrase>,
}
impl UnicastSubscriber {
pub async fn recognize(&mut self) -> Phrase {
self.rx.recv().await.unwrap()
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
#[derive(Debug, Clone, PartialEq)]
pub enum BroadcastResult {
Phrase(Phrase),
Lagged(u64),
}
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
pub struct BroadcastSubscriber {
rx: broadcast::Receiver<Phrase>,
}
impl BroadcastSubscriber {
pub async fn recognize(&mut self) -> BroadcastResult {
match self.rx.recv().await {
Ok(phrase) => BroadcastResult::Phrase(phrase),
Err(RecvError::Lagged(skipped)) => BroadcastResult::Lagged(skipped),
Err(err) => panic!("{}", err),
}
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
pub struct UnicastContext {
base: EventfulContext,
}
impl UnicastContext {
pub fn new(recognizer: &Recognizer, buffer: usize) -> Result<(Self, UnicastSubscriber)> {
let (tx, rx) = mpsc::channel::<Phrase>(buffer);
let handler = move |phrase| {
let _ = tx.try_send(phrase);
};
Ok((
Self {
base: EventfulContext::new(recognizer, handler)?,
},
UnicastSubscriber { rx },
))
}
}
impl Deref for UnicastContext {
type Target = Context;
fn deref(&self) -> &Self::Target {
&self.base
}
}
#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
pub struct BroadcastContext {
base: EventfulContext,
tx: broadcast::Sender<Phrase>,
}
impl BroadcastContext {
pub fn new(recognizer: &Recognizer, buffer: usize) -> Result<(Self, BroadcastSubscriber)> {
let (tx, rx) = broadcast::channel::<Phrase>(buffer);
let handler = {
let tx = tx.clone();
move |phrase| {
let _ = tx.send(phrase);
}
};
Ok((
Self {
base: EventfulContext::new(recognizer, handler)?,
tx,
},
BroadcastSubscriber { rx },
))
}
pub fn subscribe(&self) -> BroadcastSubscriber {
BroadcastSubscriber {
rx: self.tx.subscribe(),
}
}
}
impl Deref for BroadcastContext {
type Target = Context;
fn deref(&self) -> &Self::Target {
&self.base
}
}