use std::io;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crossbeam_channel as channel;
use crate::client::Client;
use crate::message::Message;
#[derive(Debug)]
struct Inner {
pub(crate) sid: u64,
pub(crate) subject: String,
pub(crate) messages: channel::Receiver<Message>,
pub(crate) client: Client,
}
impl Drop for Inner {
fn drop(&mut self) {
self.client.unsubscribe(self.sid).ok();
}
}
#[derive(Clone, Debug)]
pub struct Subscription(Arc<Inner>);
impl Subscription {
pub(crate) fn new(
sid: u64,
subject: String,
messages: channel::Receiver<Message>,
client: Client,
) -> Subscription {
Subscription(Arc::new(Inner {
sid,
subject,
messages,
client,
}))
}
pub fn receiver(&self) -> &channel::Receiver<Message> {
&self.0.messages
}
pub fn next(&self) -> Option<Message> {
self.0.messages.recv().ok()
}
pub fn try_next(&self) -> Option<Message> {
self.0.messages.try_recv().ok()
}
pub fn next_timeout(&self, timeout: Duration) -> io::Result<Message> {
match self.0.messages.recv_timeout(timeout) {
Ok(msg) => Ok(msg),
Err(channel::RecvTimeoutError::Timeout) => Err(io::Error::new(
io::ErrorKind::TimedOut,
"next_timeout: timed out",
)),
Err(channel::RecvTimeoutError::Disconnected) => Err(io::Error::new(
io::ErrorKind::Other,
"next_timeout: unsubscribed",
)),
}
}
pub fn messages(&self) -> Iter<'_> {
Iter { subscription: self }
}
pub fn iter(&self) -> Iter<'_> {
Iter { subscription: self }
}
pub fn try_iter(&self) -> TryIter<'_> {
TryIter { subscription: self }
}
pub fn timeout_iter(&self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
subscription: self,
to: timeout,
}
}
pub fn with_handler<F>(self, handler: F) -> Handler
where
F: Fn(Message) -> io::Result<()> + Send + 'static,
{
let sub = self.clone();
thread::Builder::new()
.name(format!("nats_subscriber_{}_{}", self.0.sid, self.0.subject))
.spawn(move || {
for m in &sub {
if let Err(e) = handler(m) {
log::error!("Error in callback! {:?}", e);
}
}
})
.expect("threads should be spawnable");
Handler { sub: self }
}
pub fn set_message_limits(&self, limit: usize) {
self.0
.client
.state
.read
.lock()
.subscriptions
.entry(self.0.sid)
.and_modify(|sub| sub.pending_messages_limit = Some(limit));
}
pub fn dropped_messages(&self) -> io::Result<usize> {
self.0
.client
.state
.read
.lock()
.subscriptions
.get(&self.0.sid)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "subscription not found"))
.map(|subscription| subscription.dropped_messages)
}
pub fn unsubscribe(self) -> io::Result<()> {
self.0.client.unsubscribe(self.0.sid)?;
while self.0.messages.try_recv().is_ok() {}
Ok(())
}
pub fn close(self) -> io::Result<()> {
self.unsubscribe()
}
pub fn drain(&self) -> io::Result<()> {
self.0.client.flush(crate::DEFAULT_FLUSH_TIMEOUT)?;
self.0.client.unsubscribe(self.0.sid)?;
Ok(())
}
}
impl IntoIterator for Subscription {
type Item = Message;
type IntoIter = IntoIter;
fn into_iter(self) -> IntoIter {
IntoIter { subscription: self }
}
}
impl<'a> IntoIterator for &'a Subscription {
type Item = Message;
type IntoIter = Iter<'a>;
fn into_iter(self) -> Iter<'a> {
Iter { subscription: self }
}
}
pub struct Handler {
sub: Subscription,
}
impl Handler {
pub fn unsubscribe(self) -> io::Result<()> {
self.sub.drain()
}
}
pub struct TryIter<'a> {
subscription: &'a Subscription,
}
impl<'a> Iterator for TryIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.try_next()
}
}
pub struct Iter<'a> {
subscription: &'a Subscription,
}
impl<'a> Iterator for Iter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
pub struct IntoIter {
subscription: Subscription,
}
impl Iterator for IntoIter {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
pub struct TimeoutIter<'a> {
subscription: &'a Subscription,
to: Duration,
}
impl<'a> Iterator for TimeoutIter<'a> {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next_timeout(self.to).ok()
}
}