use super::core::{Task, TaskUnsrv, TaskUnsub};
use crate::protocol::{MsgId, UnsrvMsg, UnsubMsg};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
pub struct Subscription<T> {
receiver: UnboundedReceiver<T>,
_subscription_into: SubscriptionInto,
}
impl<T> Subscription<T> {
pub(crate) fn new(
receiver: UnboundedReceiver<T>,
subscription_into: SubscriptionInto,
) -> Subscription<T> {
Subscription {
receiver,
_subscription_into: subscription_into,
}
}
pub async fn recv(&mut self) -> Option<T> {
self.receiver.recv().await
}
}
pub struct SubscriptionInto {
topic: String,
task_sender: UnboundedSender<Task>,
}
impl SubscriptionInto {
pub(crate) fn new(topic: &str, task_sender: UnboundedSender<Task>) -> SubscriptionInto {
SubscriptionInto {
task_sender,
topic: topic.into(),
}
}
}
impl Drop for SubscriptionInto {
fn drop(&mut self) {
let task = Task::Unsub(TaskUnsub {
msg: UnsubMsg {
topic: self.topic.clone(),
},
});
let _ = self.task_sender.send(task);
}
}
pub struct RequestSubscription<T> {
receiver: UnboundedReceiver<(String, MsgId, T)>,
topic: String,
task_sender: UnboundedSender<Task>,
}
impl<T> RequestSubscription<T> {
pub(crate) fn new(
topic: &str,
receiver: UnboundedReceiver<(String, MsgId, T)>,
task_sender: UnboundedSender<Task>,
) -> RequestSubscription<T> {
RequestSubscription {
receiver,
task_sender,
topic: topic.into(),
}
}
pub async fn recv(&mut self) -> Option<(String, MsgId, T)> {
self.receiver.recv().await
}
}
impl<T> Drop for RequestSubscription<T> {
fn drop(&mut self) {
let task = Task::Unsrv(TaskUnsrv {
msg: UnsrvMsg {
topic: self.topic.clone(),
},
});
let _ = self.task_sender.send(task);
}
}