use std::{
pin::Pin,
task::{Context, Poll},
};
use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use pin_project::{pin_project, pinned_drop};
use crate::{acker::Acker, queue::Publisher, serializer::Serializable};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ChannelType {
ExactlyOnce,
Broadcast,
}
#[async_trait]
pub trait Channel {
type Sender<'a, T: Serializable + 'a>: Publisher<T>;
type Acker: Acker;
type Receiver<'a, T: Serializable + 'a>: Stream<Item = (T, Self::Acker)>;
async fn close(&self) -> Result<()>;
async fn sender<'a, T: Serializable + 'a>(&self) -> Result<Self::Sender<'a, T>>;
async fn receiver<'a, T: Serializable + 'a>(&self) -> Result<Self::Receiver<'a, T>>;
fn release(&self);
}
#[async_trait]
pub trait ChannelFactory {
type Channel: Channel;
async fn get(&self, identifier: String, channel_type: ChannelType) -> Result<Self::Channel>;
async fn issue(&self, channel_type: ChannelType) -> Result<(String, Self::Channel)>;
}
#[pin_project(PinnedDrop)]
pub struct LeaseGuard<C: Channel, Pipe> {
#[pin]
pipe: Pipe,
channel: Option<C>,
}
impl<C: Channel, Pipe: Stream> Stream for LeaseGuard<C, Pipe> {
type Item = Pipe::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().pipe.poll_next(cx)
}
}
impl<C: Channel, Pipe> std::ops::Deref for LeaseGuard<C, Pipe> {
type Target = Pipe;
fn deref(&self) -> &Self::Target {
&self.pipe
}
}
impl<C: Channel, Pipe> std::ops::DerefMut for LeaseGuard<C, Pipe> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.pipe
}
}
impl<C: Channel, Pipe> LeaseGuard<C, Pipe> {
pub fn new(channel: C, pipe: Pipe) -> Self {
Self {
pipe,
channel: Some(channel),
}
}
}
#[pinned_drop]
impl<C: Channel, Pipe> PinnedDrop for LeaseGuard<C, Pipe> {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if let Some(channel) = this.channel.take() {
channel.release();
}
}
}
pub mod coordinated_channel;
pub mod queue;