1use std::ops::Deref;
2
3use tokio::sync::broadcast::error::RecvError;
4use tokio::sync::{broadcast, mpsc};
5
6use crate::stt::{Context, EventfulContext, Phrase, Recognizer};
7use crate::Result;
8
9#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
11pub struct UnicastSubscriber {
12 rx: mpsc::Receiver<Phrase>,
13}
14
15impl UnicastSubscriber {
16 pub async fn recognize(&mut self) -> Phrase {
18 self.rx.recv().await.unwrap()
19 }
20}
21
22#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
24#[derive(Debug, Clone, PartialEq)]
25pub enum BroadcastResult {
26 Phrase(Phrase),
28 Lagged(u64),
30}
31
32#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
34pub struct BroadcastSubscriber {
35 rx: broadcast::Receiver<Phrase>,
36}
37
38impl BroadcastSubscriber {
39 pub async fn recognize(&mut self) -> BroadcastResult {
41 match self.rx.recv().await {
42 Ok(phrase) => BroadcastResult::Phrase(phrase),
43 Err(RecvError::Lagged(skipped)) => BroadcastResult::Lagged(skipped),
44 Err(err) => panic!("{}", err),
45 }
46 }
47}
48
49#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
51pub struct UnicastContext {
52 base: EventfulContext,
53}
54
55impl UnicastContext {
56 pub fn new(recognizer: &Recognizer, buffer: usize) -> Result<(Self, UnicastSubscriber)> {
60 let (tx, rx) = mpsc::channel::<Phrase>(buffer);
61 let handler = move |phrase| {
62 let _ = tx.try_send(phrase);
63 };
64 Ok((
65 Self {
66 base: EventfulContext::new(recognizer, handler)?,
67 },
68 UnicastSubscriber { rx },
69 ))
70 }
71}
72
73impl Deref for UnicastContext {
74 type Target = Context;
75 fn deref(&self) -> &Self::Target {
76 &self.base
77 }
78}
79
80#[cfg_attr(docsrs, doc(cfg(feature = "tokio-stt")))]
82pub struct BroadcastContext {
83 base: EventfulContext,
84 tx: broadcast::Sender<Phrase>,
85}
86
87impl BroadcastContext {
88 pub fn new(recognizer: &Recognizer, buffer: usize) -> Result<(Self, BroadcastSubscriber)> {
93 let (tx, rx) = broadcast::channel::<Phrase>(buffer);
94 let handler = {
95 let tx = tx.clone();
96 move |phrase| {
97 let _ = tx.send(phrase);
98 }
99 };
100 Ok((
101 Self {
102 base: EventfulContext::new(recognizer, handler)?,
103 tx,
104 },
105 BroadcastSubscriber { rx },
106 ))
107 }
108
109 pub fn subscribe(&self) -> BroadcastSubscriber {
111 BroadcastSubscriber {
112 rx: self.tx.subscribe(),
113 }
114 }
115}
116
117impl Deref for BroadcastContext {
118 type Target = Context;
119 fn deref(&self) -> &Self::Target {
120 &self.base
121 }
122}