use hermes_proto::broker_client::BrokerClient;
use hermes_proto::{Message, Sub, SubscribeRequest};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tracing::{debug, info};
pub struct Subscriber {
cmd_tx: mpsc::Sender<SubscribeRequest>,
delivery_rx: mpsc::Receiver<Message>,
}
impl Subscriber {
pub async fn new(channel: Channel) -> Result<Self, tonic::Status> {
let mut client = BrokerClient::new(channel);
let (cmd_tx, cmd_rx) = mpsc::channel::<SubscribeRequest>(64);
let (delivery_tx, delivery_rx) = mpsc::channel::<Message>(256);
let response = client.subscribe(ReceiverStream::new(cmd_rx)).await?;
let mut resp_stream = response.into_inner();
tokio::spawn(async move {
while let Ok(Some(resp)) = resp_stream.message().await {
for m in resp.messages {
if delivery_tx.send(m).await.is_err() {
debug!("subscriber delivery channel closed");
return;
}
}
}
debug!("subscriber stream ended");
});
info!("subscriber created");
Ok(Self {
cmd_tx,
delivery_rx,
})
}
pub async fn subscribe(
&self,
subject: impl Into<String>,
queue_group: Option<String>,
) -> Result<(), SubscribeError> {
let subject = subject.into();
info!(
subject = %subject,
queue_group = queue_group.as_deref().unwrap_or("(none)"),
"subscribing to subject"
);
let req = SubscribeRequest {
sub: Some(Sub {
subject,
queue_group: queue_group.unwrap_or_default(),
}),
};
self.cmd_tx
.send(req)
.await
.map_err(|_| SubscribeError::Disconnected)?;
Ok(())
}
pub async fn recv(&mut self) -> Option<Message> {
self.delivery_rx.recv().await
}
}
#[derive(Debug, thiserror::Error)]
pub enum SubscribeError {
#[error("disconnected from broker")]
Disconnected,
}