use super::*;
use async_trait::async_trait;
use core::pin::Pin;
use futures::{
future::Aborted,
stream::StreamExt,
task::{AtomicWaker, Context, Poll},
};
use pin_project_lite::pin_project;
use prometheus::core::Collector;
use std::sync::atomic::{AtomicBool, Ordering};
pub use tokio::net::TcpListener;
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::IntervalStream;
pub use tokio_stream::wrappers::TcpListenerStream;
#[derive(Debug)]
struct AbortInner {
waker: AtomicWaker,
aborted: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct AbortHandle {
inner: std::sync::Arc<AbortInner>,
}
impl AbortHandle {
pub fn abort(&self) {
self.inner.aborted.store(true, Ordering::Relaxed);
self.inner.waker.wake();
}
pub fn new_pair() -> (Self, AbortRegistration) {
let inner = std::sync::Arc::new(AbortInner {
waker: AtomicWaker::new(),
aborted: AtomicBool::new(false),
});
(AbortHandle { inner: inner.clone() }, AbortRegistration { inner })
}
}
#[derive(Debug, Clone)]
pub struct AbortRegistration {
inner: std::sync::Arc<AbortInner>,
}
impl AbortRegistration {
pub fn is_aborted(&self) -> bool {
self.inner.aborted.load(Ordering::Acquire)
}
}
pin_project! {
#[derive(Debug, Clone)]
#[must_use = "futures/streams do nothing unless you poll them"]
pub struct Abortable<T> {
#[pin]
task: T,
inner: std::sync::Arc<AbortInner>,
}
}
impl<T> std::ops::Deref for Abortable<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.task
}
}
impl<T> std::ops::DerefMut for Abortable<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.task
}
}
impl<T> std::convert::AsMut<T> for Abortable<T> {
fn as_mut(&mut self) -> &mut T {
&mut self.task
}
}
impl<T> Abortable<T> {
pub fn new(task: T, reg: AbortRegistration) -> Self {
Self { task, inner: reg.inner }
}
pub fn is_aborted(&self) -> bool {
self.inner.aborted.load(Ordering::Relaxed)
}
fn try_poll<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut poll: impl FnMut(Pin<&mut T>, &mut Context<'_>) -> Poll<I>,
) -> Poll<Result<I, Aborted>> {
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}
if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) {
return Poll::Ready(Ok(x));
}
self.inner.waker.register(cx.waker());
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}
Poll::Pending
}
}
impl<Fut> futures::future::Future for Abortable<Fut>
where
Fut: futures::future::Future,
{
type Output = Result<Fut::Output, Aborted>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.try_poll(cx, |fut, cx| fut.poll(cx))
}
}
impl<St> futures::stream::Stream for Abortable<St>
where
St: futures::stream::Stream,
{
type Item = St::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.try_poll(cx, |stream, cx| stream.poll_next(cx))
.map(Result::ok)
.map(Option::flatten)
}
}
impl<R> tokio::io::AsyncRead for Abortable<R>
where
R: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let r = self.try_poll(cx, |inner_fut, cx| inner_fut.poll_read(cx, buf));
match r {
Poll::Ready(outer_res) => match outer_res {
Ok(inner_res) => Poll::Ready(inner_res),
Err(Aborted) => {
return Poll::Ready(Ok(()));
}
},
Poll::Pending => Poll::Pending,
}
}
}
impl<R> tokio::io::AsyncWrite for Abortable<R>
where
R: tokio::io::AsyncWrite,
{
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::io::Result<usize>> {
let r = self.try_poll(cx, |inner_fut, cx| inner_fut.poll_write(cx, buf));
match r {
Poll::Ready(outer_res) => match outer_res {
Ok(inner_res) => Poll::Ready(inner_res),
Err(Aborted) => {
return Poll::Ready(Ok(0));
}
},
Poll::Pending => Poll::Pending,
}
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<std::io::Result<usize>> {
let r = self.try_poll(cx, |inner_fut, cx| inner_fut.poll_write_vectored(cx, bufs));
match r {
Poll::Ready(outer_res) => match outer_res {
Ok(inner_res) => Poll::Ready(inner_res),
Err(Aborted) => {
return Poll::Ready(Ok(0));
}
},
Poll::Pending => Poll::Pending,
}
}
fn is_write_vectored(&self) -> bool {
self.task.is_write_vectored()
}
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.as_mut().project().task.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.as_mut().project().task.poll_shutdown(cx)
}
}
#[async_trait]
pub trait Channel: Send + Sized {
type Event: Send;
type Handle: Send + Clone + super::Shutdown;
type Inbox: Send + Sync;
type Metric: Collector + Clone;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<Self::Metric>,
Option<Box<dyn Route<Self::Event>>>,
);
fn type_name() -> std::borrow::Cow<'static, str> {
std::any::type_name::<Self>().into()
}
}
#[async_trait::async_trait]
pub trait ChannelBuilder<C: Channel> {
async fn build_channel(&mut self) -> ActorResult<C>;
}
#[async_trait::async_trait]
pub trait Route<M>: Send + Sync + dyn_clone::DynClone {
async fn try_send_msg(&self, message: M) -> anyhow::Result<Option<M>>;
async fn send_msg(&self, message: M) -> anyhow::Result<()>;
}
dyn_clone::clone_trait_object!(<M> Route<M>);
pub struct UnboundedChannel<E> {
abort_handle: AbortHandle,
abort_registration: AbortRegistration,
tx: UnboundedSender<E>,
rx: UnboundedReceiver<E>,
}
impl<E> UnboundedChannel<E> {
pub fn new() -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<E>();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Self {
abort_handle,
abort_registration,
tx,
rx,
}
}
}
#[derive(Debug)]
pub struct UnboundedInbox<T> {
metric: prometheus::IntGauge,
inner: UnboundedReceiver<T>,
}
impl<T> UnboundedInbox<T> {
pub fn new(recv: UnboundedReceiver<T>, gauge: prometheus::IntGauge) -> Self {
Self {
inner: recv,
metric: gauge,
}
}
pub fn close(&mut self) {
self.inner.close()
}
}
impl<T> tokio_stream::Stream for UnboundedInbox<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let r = self.inner.poll_recv(cx);
if let &Poll::Ready(Some(_)) = &r {
self.metric.dec();
}
r
}
}
#[derive(Debug)]
pub struct UnboundedHandle<T> {
scope_id: ScopeId,
abort_handle: AbortHandle,
metric: prometheus::IntGauge,
inner: UnboundedSender<T>,
}
impl<T> Clone for UnboundedHandle<T> {
fn clone(&self) -> Self {
Self {
scope_id: self.scope_id,
abort_handle: self.abort_handle.clone(),
metric: self.metric.clone(),
inner: self.inner.clone(),
}
}
}
impl<T> UnboundedHandle<T> {
pub fn new(
sender: UnboundedSender<T>,
gauge: prometheus::IntGauge,
abort_handle: AbortHandle,
scope_id: ScopeId,
) -> Self {
Self {
scope_id,
abort_handle,
metric: gauge,
inner: sender,
}
}
pub fn send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
let r = self.inner.send(message);
if r.is_ok() {
self.metric.inc()
}
r
}
pub fn send_after(
&self,
message: T,
duration: std::time::Duration,
) -> tokio::task::JoinHandle<Result<(), tokio::sync::mpsc::error::SendError<T>>>
where
T: Send + 'static,
{
let h = self.clone();
let fut = async move {
tokio::time::sleep(duration).await;
h.send(message)
};
tokio::spawn(fut)
}
pub async fn closed(&self) {
self.inner.closed().await
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.inner.same_channel(&other.inner)
}
}
#[async_trait::async_trait]
impl<A: Send + 'static, T: ServiceEvent<A>> SupHandle<A> for UnboundedHandle<T> {
type Event = T;
async fn report(&self, scope_id: ScopeId, data: Service) -> Option<()> {
self.send(T::report_event(scope_id, data)).ok()
}
async fn eol(self, scope_id: super::ScopeId, service: Service, actor: A, r: ActorResult<()>) -> Option<()> {
self.send(T::eol_event(scope_id, service, actor, r)).ok()
}
}
#[async_trait::async_trait]
impl<E: ShutdownEvent + 'static, T> ChannelBuilder<UnboundedChannel<E>> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<UnboundedChannel<E>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<E>();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Ok(UnboundedChannel {
abort_handle,
abort_registration,
tx,
rx,
})
}
}
impl<E: ShutdownEvent + 'static> Channel for UnboundedChannel<E> {
type Event = E;
type Handle = UnboundedHandle<E>;
type Inbox = UnboundedInbox<E>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<Self::Metric>,
Option<Box<dyn Route<Self::Event>>>,
) {
let metric_fq_name = format!("ScopeId:{}", scope_id);
let metric_helper_name = format!(
"ScopeId: {}, Actor: {}, Channel: {}",
scope_id,
std::any::type_name::<T>(),
Self::type_name()
);
let gauge = prometheus::core::GenericGauge::new(metric_fq_name, metric_helper_name)
.expect("channel gauge can be created");
let sender = self.tx;
let recv = self.rx;
let abort_handle = self.abort_handle;
let abort_registration = self.abort_registration;
let unbounded_handle = UnboundedHandle::new(sender, gauge.clone(), abort_handle, scope_id);
let unbounded_inbox = UnboundedInbox::new(recv, gauge.clone());
let route = Box::new(unbounded_handle.clone());
(
unbounded_handle,
unbounded_inbox,
abort_registration,
Some(gauge),
Some(route),
)
}
}
#[async_trait::async_trait]
impl<E: 'static + ShutdownEvent> super::Shutdown for UnboundedHandle<E> {
async fn shutdown(&self) {
self.abort_handle.abort();
self.send(E::shutdown_event()).ok();
}
fn scope_id(&self) -> super::ScopeId {
self.scope_id
}
}
#[async_trait::async_trait]
impl<M: 'static + Send, E: Send> Route<M> for UnboundedHandle<E>
where
E: std::convert::TryFrom<M>,
{
async fn try_send_msg(&self, message: M) -> anyhow::Result<Option<M>> {
if let Ok(event) = E::try_from(message) {
if let Err(error) = self.send(event) {
anyhow::bail!("SendError: {}", error)
};
} else {
anyhow::bail!("Unable to convert the provided message into channel event")
};
Ok(None)
}
async fn send_msg(&self, message: M) -> anyhow::Result<()> {
if let Ok(event) = E::try_from(message) {
if let Err(error) = self.send(event) {
anyhow::bail!("{}", error)
};
Ok(())
} else {
anyhow::bail!("Unable to convert the provided message into channel event")
}
}
}
pub struct AbortableUnboundedChannel<E> {
abort_handle: AbortHandle,
abort_registration: AbortRegistration,
tx: UnboundedSender<E>,
rx: UnboundedReceiver<E>,
}
impl<E> AbortableUnboundedChannel<E> {
pub fn new() -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<E>();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
AbortableUnboundedChannel {
abort_handle,
abort_registration,
tx,
rx,
}
}
}
#[async_trait::async_trait]
impl<A: Send + 'static, T: ServiceEvent<A>> SupHandle<A> for AbortableUnboundedHandle<T> {
type Event = T;
async fn report(&self, scope_id: ScopeId, service: Service) -> Option<()> {
self.send(T::report_event(scope_id, service)).ok()
}
async fn eol(self, scope_id: super::ScopeId, service: Service, actor: A, r: super::ActorResult<()>) -> Option<()> {
self.send(T::eol_event(scope_id, service, actor, r)).ok()
}
}
#[async_trait::async_trait]
impl<E: Send + 'static, T> ChannelBuilder<AbortableUnboundedChannel<E>> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<AbortableUnboundedChannel<E>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<E>();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Ok(AbortableUnboundedChannel {
abort_handle,
abort_registration,
tx,
rx,
})
}
}
#[derive(Debug)]
pub struct AbortableUnboundedInbox<T> {
inner: Abortable<UnboundedInbox<T>>,
}
impl<T> AbortableUnboundedInbox<T> {
pub fn new(inbox: UnboundedInbox<T>, abort_registration: AbortRegistration) -> Self {
let abortable = Abortable::new(inbox, abort_registration);
Self { inner: abortable }
}
pub fn close(&mut self) {
self.inner.as_mut().close()
}
}
impl<T> tokio_stream::Stream for AbortableUnboundedInbox<T>
where
Abortable<UnboundedInbox<T>>: tokio_stream::Stream,
{
type Item = <Abortable<UnboundedInbox<T>> as tokio_stream::Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
#[derive(Debug)]
pub struct AbortableUnboundedHandle<T> {
inner: UnboundedHandle<T>,
}
impl<T> Clone for AbortableUnboundedHandle<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> AbortableUnboundedHandle<T> {
pub fn new(sender: UnboundedHandle<T>) -> Self {
Self { inner: sender }
}
pub fn send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.inner.send(message)
}
pub fn send_after(
&self,
message: T,
duration: std::time::Duration,
) -> tokio::task::JoinHandle<Result<(), tokio::sync::mpsc::error::SendError<T>>>
where
T: Send + 'static,
{
self.inner.send_after(message, duration)
}
pub async fn closed(&self) {
self.inner.closed().await
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.inner.same_channel(&other.inner)
}
}
impl<E: Send + 'static> Channel for AbortableUnboundedChannel<E> {
type Event = E;
type Handle = AbortableUnboundedHandle<E>;
type Inbox = AbortableUnboundedInbox<E>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<E>>>,
) {
let metric_fq_name = format!("ScopeId:{}", scope_id);
let metric_helper_name = format!(
"ScopeId: {}, Actor: {}, Channel: {}",
scope_id,
std::any::type_name::<T>(),
Self::type_name()
);
let gauge = prometheus::core::GenericGauge::new(metric_fq_name, metric_helper_name)
.expect("channel gauge can be created");
let sender = self.tx;
let recv = self.rx;
let abort_handle = self.abort_handle;
let abort_registration = self.abort_registration;
let unbounded_handle = UnboundedHandle::new(sender, gauge.clone(), abort_handle, scope_id);
let unbounded_inbox = UnboundedInbox::new(recv, gauge.clone());
let abortable_unbounded_handle = AbortableUnboundedHandle::new(unbounded_handle);
let abortable_unbounded_inbox = AbortableUnboundedInbox::new(unbounded_inbox, abort_registration.clone());
let route = Box::new(abortable_unbounded_handle.clone());
(
abortable_unbounded_handle,
abortable_unbounded_inbox,
abort_registration,
Some(gauge),
Some(route),
)
}
}
#[async_trait::async_trait]
impl<E: Send + 'static> super::Shutdown for AbortableUnboundedHandle<E> {
async fn shutdown(&self) {
self.inner.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.inner.scope_id
}
}
#[async_trait::async_trait]
impl<M: 'static + Send, E: Send> Route<M> for AbortableUnboundedHandle<E>
where
E: std::convert::TryFrom<M>,
{
async fn try_send_msg(&self, message: M) -> anyhow::Result<Option<M>> {
self.inner.try_send_msg(message).await
}
async fn send_msg(&self, message: M) -> anyhow::Result<()> {
if let Ok(event) = E::try_from(message) {
if let Err(error) = self.send(event) {
anyhow::bail!("{}", error)
};
Ok(())
} else {
anyhow::bail!("Unable to convert the provided message into channel event")
}
}
}
pub struct BoundedChannel<E, const C: usize> {
abort_handle: AbortHandle,
abort_registration: AbortRegistration,
tx: Sender<E>,
rx: Receiver<E>,
}
#[derive(Debug)]
pub struct BoundedInbox<T> {
metric: prometheus::IntGauge,
inner: Receiver<T>,
}
impl<T> BoundedInbox<T> {
pub fn new(recv: Receiver<T>, gauge: prometheus::IntGauge) -> Self {
Self {
inner: recv,
metric: gauge,
}
}
pub fn close(&mut self) {
self.inner.close()
}
}
impl<T> tokio_stream::Stream for BoundedInbox<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let r = self.inner.poll_recv(cx);
if let &Poll::Ready(Some(_)) = &r {
self.metric.dec();
}
r
}
}
#[derive(Debug)]
pub struct BoundedHandle<T> {
scope_id: ScopeId,
abort_handle: AbortHandle,
metric: prometheus::IntGauge,
inner: Sender<T>,
}
impl<T> Clone for BoundedHandle<T> {
fn clone(&self) -> Self {
Self {
scope_id: self.scope_id.clone(),
abort_handle: self.abort_handle.clone(),
metric: self.metric.clone(),
inner: self.inner.clone(),
}
}
}
impl<T> BoundedHandle<T> {
pub fn new(sender: Sender<T>, gauge: prometheus::IntGauge, abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self {
scope_id,
abort_handle,
metric: gauge,
inner: sender,
}
}
pub async fn send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
let r = self.inner.send(message).await;
if r.is_ok() {
self.metric.inc()
}
r
}
pub fn send_after(
&self,
message: T,
duration: std::time::Duration,
) -> tokio::task::JoinHandle<Result<(), tokio::sync::mpsc::error::SendError<T>>>
where
T: Send + 'static,
{
let h = self.clone();
let fut = async move {
tokio::time::sleep(duration).await;
h.send(message).await
};
tokio::spawn(fut)
}
pub async fn closed(&self) {
self.inner.closed().await
}
pub fn try_send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::TrySendError<T>> {
let r = self.inner.try_send(message);
if r.is_ok() {
self.metric.inc();
}
r
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> Result<(), tokio::sync::mpsc::error::SendTimeoutError<T>> {
let r = self.inner.send_timeout(message).await;
if r.is_ok() {
self.metric.inc();
}
r
}
#[cfg(feature = "sync")]
pub fn blocking_send(&self, value: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
let r = self.inner.blocking_send(message);
if r.is_ok() {
self.metric.inc();
}
r
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.inner.same_channel(&other.inner)
}
}
#[async_trait::async_trait]
impl<A: Send + 'static, T: ServiceEvent<A>> SupHandle<A> for AbortableBoundedHandle<T> {
type Event = T;
async fn report(&self, scope_id: ScopeId, service: Service) -> Option<()> {
self.send(T::report_event(scope_id, service)).await.ok()
}
async fn eol(self, scope_id: super::ScopeId, service: Service, actor: A, r: ActorResult<()>) -> Option<()> {
self.send(T::eol_event(scope_id, service, actor, r)).await.ok()
}
}
#[async_trait::async_trait]
impl<E: ShutdownEvent + 'static, T, const C: usize> ChannelBuilder<BoundedChannel<E, C>> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<BoundedChannel<E, C>> {
let (tx, rx) = tokio::sync::mpsc::channel::<E>(C);
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Ok(BoundedChannel {
abort_handle,
abort_registration,
tx,
rx,
})
}
}
impl<E: ShutdownEvent + 'static, const C: usize> Channel for BoundedChannel<E, C> {
type Event = E;
type Handle = BoundedHandle<E>;
type Inbox = BoundedInbox<E>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<Self::Metric>,
Option<Box<dyn Route<E>>>,
) {
let metric_fq_name = format!("ScopeId:{}", scope_id);
let metric_helper_name = format!(
"ScopeId: {}, Actor: {}, Channel: {}",
scope_id,
std::any::type_name::<T>(),
Self::type_name()
);
let gauge = prometheus::core::GenericGauge::new(metric_fq_name, metric_helper_name)
.expect("channel gauge can be created");
let sender = self.tx;
let recv = self.rx;
let abort_handle = self.abort_handle;
let abort_registration = self.abort_registration;
let unbounded_handle = BoundedHandle::new(sender, gauge.clone(), abort_handle, scope_id);
let unbounded_inbox = BoundedInbox::new(recv, gauge.clone());
let route = Box::new(unbounded_handle.clone());
(
unbounded_handle,
unbounded_inbox,
abort_registration,
Some(gauge),
Some(route),
)
}
}
#[async_trait::async_trait]
impl<E: 'static + ShutdownEvent> super::Shutdown for BoundedHandle<E> {
async fn shutdown(&self) {
self.abort_handle.abort();
self.send(E::shutdown_event()).await.ok();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
#[async_trait::async_trait]
impl<M: 'static + Send, E: Send> Route<M> for BoundedHandle<E>
where
E: std::convert::TryFrom<M>,
E::Error: Send,
{
async fn try_send_msg(&self, message: M) -> anyhow::Result<Option<M>> {
match self.inner.try_reserve() {
Ok(permit) => {
if let Ok(event) = E::try_from(message) {
self.metric.inc();
permit.send(event);
Ok(None)
} else {
anyhow::bail!("Unabled to convert the provided message into event type");
}
}
Err(err) => {
if let TrySendError::Full(()) = err {
Ok(Some(message))
} else {
anyhow::bail!("Closed channel")
}
}
}
}
async fn send_msg(&self, message: M) -> anyhow::Result<()> {
if let Ok(event) = E::try_from(message) {
self.send(event).await.map_err(|e| anyhow::Error::msg(format!("{}", e)))
} else {
anyhow::bail!("Unabled to convert the provided message into event type")
}
}
}
pub struct AbortableBoundedChannel<E, const C: usize> {
abort_handle: AbortHandle,
abort_registration: AbortRegistration,
tx: Sender<E>,
rx: Receiver<E>,
}
#[derive(Debug)]
pub struct AbortableBoundedInbox<T> {
inner: Abortable<BoundedInbox<T>>,
}
#[async_trait::async_trait]
impl<E: Send + 'static, T, const C: usize> ChannelBuilder<AbortableBoundedChannel<E, C>> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<AbortableBoundedChannel<E, C>> {
let (tx, rx) = tokio::sync::mpsc::channel::<E>(C);
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Ok(AbortableBoundedChannel {
abort_handle,
abort_registration,
tx,
rx,
})
}
}
impl<T> AbortableBoundedInbox<T> {
pub fn new(inbox: BoundedInbox<T>, abort_registration: AbortRegistration) -> Self {
let abortable = Abortable::new(inbox, abort_registration);
Self { inner: abortable }
}
pub fn close(&mut self) {
self.inner.as_mut().close()
}
}
#[derive(Debug)]
pub struct AbortableBoundedHandle<T> {
inner: BoundedHandle<T>,
}
impl<T> Clone for AbortableBoundedHandle<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> AbortableBoundedHandle<T> {
pub fn new(sender: BoundedHandle<T>) -> Self {
Self { inner: sender }
}
pub async fn send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.inner.send(message).await
}
pub fn send_after(
&self,
message: T,
duration: std::time::Duration,
) -> tokio::task::JoinHandle<Result<(), tokio::sync::mpsc::error::SendError<T>>>
where
T: Send + 'static,
{
self.inner.send_after(message, duration)
}
pub async fn closed(&self) {
self.inner.closed().await
}
pub fn try_send(&self, message: T) -> Result<(), tokio::sync::mpsc::error::TrySendError<T>> {
self.inner.try_send(message)
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub async fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> Result<(), tokio::sync::mpsc::error::SendTimeoutError<T>> {
self.inner.send_timeout(message).await
}
#[cfg(feature = "sync")]
pub fn blocking_send(&self, value: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.inner.blocking_send(message)
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.inner.same_channel(&other.inner)
}
}
impl<E: Send + 'static, const C: usize> Channel for AbortableBoundedChannel<E, C> {
type Event = E;
type Handle = AbortableBoundedHandle<E>;
type Inbox = AbortableBoundedInbox<E>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<E>>>,
) {
let metric_fq_name = format!("ScopeId:{}", scope_id);
let metric_helper_name = format!(
"ScopeId: {}, Actor: {}, Channel: {}",
scope_id,
std::any::type_name::<T>(),
Self::type_name()
);
let gauge = prometheus::core::GenericGauge::new(metric_fq_name, metric_helper_name)
.expect("channel gauge can be created");
let sender = self.tx;
let recv = self.rx;
let abort_handle = self.abort_handle;
let abort_registration = self.abort_registration;
let unbounded_handle = BoundedHandle::new(sender, gauge.clone(), abort_handle, scope_id);
let unbounded_inbox = BoundedInbox::new(recv, gauge.clone());
let abortable_unbounded_handle = AbortableBoundedHandle::new(unbounded_handle);
let abortable_unbounded_inbox = AbortableBoundedInbox::new(unbounded_inbox, abort_registration.clone());
let route = Box::new(abortable_unbounded_handle.clone());
(
abortable_unbounded_handle,
abortable_unbounded_inbox,
abort_registration,
Some(gauge),
Some(route),
)
}
}
#[async_trait::async_trait]
impl<E: Send + 'static> super::Shutdown for AbortableBoundedHandle<E> {
async fn shutdown(&self) {
self.inner.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.inner.scope_id
}
}
#[async_trait::async_trait]
impl<M: 'static + Send, E: Send> Route<M> for AbortableBoundedHandle<E>
where
E: std::convert::TryFrom<M>,
E::Error: Send,
{
async fn try_send_msg(&self, message: M) -> anyhow::Result<Option<M>> {
self.inner.try_send_msg(message).await
}
async fn send_msg(&self, message: M) -> anyhow::Result<()> {
if let Ok(event) = E::try_from(message) {
self.send(event).await.map_err(|e| anyhow::Error::msg(format!("{}", e)))
} else {
anyhow::bail!("Unabled to convert the provided message into event type")
}
}
}
#[derive(Clone)]
pub struct TcpListenerHandle(AbortHandle, ScopeId);
#[async_trait::async_trait]
impl super::Shutdown for TcpListenerHandle {
async fn shutdown(&self) {
self.0.abort();
}
fn scope_id(&self) -> ScopeId {
self.1
}
}
impl Channel for TcpListenerStream {
type Event = ();
type Handle = TcpListenerHandle;
type Inbox = Abortable<TcpListenerStream>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let abortable_inbox = Abortable::new(self, abort_registration.clone());
let abortable_handle = TcpListenerHandle(abort_handle, scope_id);
(abortable_handle, abortable_inbox, abort_registration, None, None)
}
}
#[cfg(feature = "hyperserver")]
mod hyper_channels {
use super::*;
pub use ::hyper;
pub struct HyperChannel<S> {
server: ::hyper::server::Builder<::hyper::server::conn::AddrIncoming>,
service: S,
}
impl<S: Send> HyperChannel<S> {
pub fn new(server: ::hyper::server::Builder<::hyper::server::conn::AddrIncoming>, service: S) -> Self {
Self { server, service }
}
}
use ::hyper::{
server::conn::{AddrIncoming, AddrStream},
Body, Request, Response,
};
impl<S, E, R, F, B> Channel for HyperChannel<S>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Send + Sync + std::error::Error,
for<'a> S: ::hyper::service::Service<&'a AddrStream, Error = E, Response = R, Future = F> + Send,
E: std::error::Error + Send + Sync + 'static,
S: Send + 'static + Sync,
F: Send + std::future::Future<Output = Result<R, E>> + 'static,
R: Send + ::hyper::service::Service<Request<Body>, Response = Response<B>> + 'static,
R::Error: std::error::Error + Send + Sync,
R::Future: Send,
{
type Event = ();
type Handle = HyperHandle;
type Inbox = HyperInbox<S>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let hyper_handle = HyperHandle::new(abort_handle, scope_id);
let hyper_inbox = HyperInbox::new(self.server, self.service, abort_registration.clone());
(hyper_handle, hyper_inbox, abort_registration, None, None)
}
}
#[derive(Clone)]
pub struct HyperHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl HyperHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for HyperHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
unsafe impl<S> Sync for HyperInbox<S> {}
pub struct HyperInbox<S> {
builder: Option<hyper::server::Builder<AddrIncoming>>,
service: Option<S>,
abort_registration: AbortRegistration,
}
impl<S, E, R, F, B> HyperInbox<S>
where
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Send + Sync + std::error::Error,
for<'a> S: ::hyper::service::Service<&'a AddrStream, Error = E, Response = R, Future = F> + Send,
E: std::error::Error + Send + Sync + 'static,
S: Send + 'static + Sync,
F: Send + std::future::Future<Output = Result<R, E>> + 'static,
R: Send + ::hyper::service::Service<Request<Body>, Response = Response<B>> + 'static,
R::Error: std::error::Error + Send + Sync,
R::Future: Send,
{
pub fn new(
builder: hyper::server::Builder<AddrIncoming>,
service: S,
abort_registration: AbortRegistration,
) -> Self {
Self {
builder: Some(builder),
service: Some(service),
abort_registration,
}
}
pub async fn ignite(&mut self) -> Result<(), ::hyper::Error> {
if let (Some(server), Some(service)) = (self.builder.take(), self.service.take()) {
let f = futures::future::pending::<()>();
let abortable = Abortable::new(f, self.abort_registration.clone());
server
.serve(service)
.with_graceful_shutdown(async {
abortable.await.ok();
})
.await?;
}
Ok(())
}
}
}
#[cfg(feature = "hyperserver")]
pub use hyper_channels::*;
#[cfg(feature = "tungstenite")]
pub use tokio_tungstenite;
pub struct IntervalChannel<const I: u64>;
#[derive(Clone)]
pub struct IntervalHandle {
scope_id: ScopeId,
abort_handle: AbortHandle,
}
impl IntervalHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { scope_id, abort_handle }
}
}
#[async_trait::async_trait]
impl<T, const I: u64> ChannelBuilder<IntervalChannel<I>> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<IntervalChannel<I>> {
Ok(IntervalChannel::<I>)
}
}
impl<const I: u64> Channel for IntervalChannel<I> {
type Event = std::time::Instant;
type Handle = IntervalHandle;
type Inbox = Abortable<IntervalStream>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<Self::Event>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let interval = tokio::time::interval(std::time::Duration::from_millis(I));
let interval_handle = IntervalHandle::new(abort_handle, scope_id);
let abortable_inbox = Abortable::new(IntervalStream::new(interval), abort_registration.clone());
(interval_handle, abortable_inbox, abort_registration, None, None)
}
}
impl Channel for std::time::Duration {
type Event = std::time::Instant;
type Handle = IntervalHandle;
type Inbox = Abortable<IntervalStream>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<Self::Event>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let interval = tokio::time::interval(self);
let interval_handle = IntervalHandle::new(abort_handle, scope_id);
let abortable_inbox = Abortable::new(IntervalStream::new(interval), abort_registration.clone());
(interval_handle, abortable_inbox, abort_registration, None, None)
}
}
#[async_trait::async_trait]
impl super::Shutdown for IntervalHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
#[async_trait::async_trait]
impl<T> ChannelBuilder<NullChannel> for T
where
T: Send,
{
async fn build_channel(&mut self) -> ActorResult<NullChannel> {
Ok(NullChannel)
}
}
pub struct NullChannel;
pub struct NullInbox;
#[derive(Clone)]
pub struct NullHandle {
scope_id: ScopeId,
abort_handle: AbortHandle,
}
impl NullHandle {
pub fn new(scope_id: ScopeId, abort_handle: AbortHandle) -> Self {
Self { scope_id, abort_handle }
}
}
impl Channel for NullChannel {
type Event = ();
type Handle = NullHandle;
type Inbox = NullInbox;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<Self::Event>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let null_handle = NullHandle::new(scope_id, abort_handle);
(null_handle, NullInbox, abort_registration, None, None)
}
}
#[async_trait::async_trait]
impl super::Shutdown for NullHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
pub struct IoChannel<T>(pub T);
impl<T> IoChannel<T> {
pub fn new(channel: T) -> Self {
Self(channel)
}
}
#[derive(Clone)]
pub struct IoHandle(AbortHandle, ScopeId);
impl IoHandle {
fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self(abort_handle, scope_id)
}
}
#[async_trait::async_trait]
impl Shutdown for IoHandle {
async fn shutdown(&self) {
self.0.abort()
}
fn scope_id(&self) -> ScopeId {
self.1
}
}
impl<S> Channel for IoChannel<S>
where
S: Send + 'static + Sync,
{
type Event = ();
type Handle = IoHandle;
type Inbox = Abortable<S>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let abortable_inbox = Abortable::new(self.0, abort_registration.clone());
let abortable_handle = IoHandle::new(abort_handle, scope_id);
(abortable_handle, abortable_inbox, abort_registration, None, None)
}
}
pub struct Marker<T, B> {
inner: T,
_marker: std::marker::PhantomData<B>,
}
impl<T, B> Clone for Marker<T, B>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_marker: std::marker::PhantomData,
}
}
}
impl<T, B> Marker<T, B> {
pub fn new(inner: T) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
}
impl<T, B> std::ops::Deref for Marker<T, B> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T, B> std::ops::DerefMut for Marker<T, B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[async_trait::async_trait]
impl<T, C, B: Send + 'static + Sync> ChannelBuilder<Marker<C, B>> for T
where
T: Send + ChannelBuilder<C>,
C: Channel,
{
async fn build_channel(&mut self) -> ActorResult<Marker<C, B>> {
let channel = <T as ChannelBuilder<C>>::build_channel(&mut self).await?;
Ok(Marker::new(channel))
}
}
impl<T, B: Send + Sync + 'static> Channel for Marker<T, B>
where
T: Channel,
{
type Event = T::Event;
type Handle = Marker<T::Handle, B>;
type Inbox = T::Inbox;
type Metric = T::Metric;
fn channel<A>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<T::Metric>,
Option<Box<dyn Route<T::Event>>>,
) {
let (h, i, a, m, r) = T::channel::<A>(self.inner, scope_id);
(Marker::new(h), i, a, m, r)
}
}
#[async_trait::async_trait]
impl<T: Shutdown + Clone, B: Send + Sync + 'static> Shutdown for Marker<T, B> {
async fn shutdown(&self) {
self.inner.shutdown().await
}
fn scope_id(&self) -> ScopeId {
self.inner.scope_id()
}
}
#[cfg(feature = "tonicserver")]
mod tonic_channels {
use super::*;
use ::hyper::server::accept::Accept;
pub use tonic;
use tonic::{
transport::server::{NamedService, Server},
Streaming,
};
use tower::Service;
pub struct TonicChannel<S> {
server: Server,
service: S,
incoming: ::hyper::server::conn::AddrIncoming,
}
impl<S> TonicChannel<S> {
pub fn new(server: Server, service: S, incoming: ::hyper::server::conn::AddrIncoming) -> Self {
Self {
server,
incoming,
service,
}
}
}
impl<S: Send + Sync, B> Channel for TonicChannel<S>
where
S: Service<hyper::Request<hyper::Body>, Response = hyper::Response<B>> + NamedService,
S::Future: Send,
{
type Event = ();
type Handle = TonicHandle;
type Inbox = TonicInbox<S>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let tonic_handle = TonicHandle::new(abort_handle, scope_id);
let tonic_inbox = TonicInbox::new(self.server, self.service, self.incoming, abort_registration.clone());
(tonic_handle, tonic_inbox, abort_registration, None, None)
}
}
#[derive(Clone)]
pub struct TonicHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl TonicHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for TonicHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
struct TcpIncoming(::hyper::server::conn::AddrIncoming);
impl tokio_stream::Stream for TcpIncoming {
type Item = Result<::hyper::server::conn::AddrStream, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_accept(cx)
}
}
pub struct TonicInbox<S> {
server: Option<Server>,
service: Option<S>,
incoming: Option<::hyper::server::conn::AddrIncoming>,
abort_registration: AbortRegistration,
}
impl<S: Send + 'static + Clone> TonicInbox<S>
where
S: Service<
hyper::Request<hyper::Body>,
Response = hyper::Response<tonic::body::BoxBody>,
Error = std::convert::Infallible,
> + NamedService,
S::Future: Send,
{
pub async fn ignite(&mut self) -> Result<(), ::tonic::transport::Error> {
if let Some(incoming) = self.incoming.take() {
let incoming = TcpIncoming(incoming);
let f = futures::future::pending::<()>();
let abortable = Abortable::new(f, self.abort_registration.clone());
if let Some(mut server) = self.server.take() {
server
.add_optional_service(self.service.take())
.serve_with_incoming_shutdown(incoming, async {
abortable.await.ok();
})
.await?;
}
}
Ok(())
}
}
impl<S> TonicInbox<S> {
pub fn new(
server: Server,
service: S,
incoming: ::hyper::server::conn::AddrIncoming,
abort_registration: AbortRegistration,
) -> Self {
Self {
server: Some(server),
service: Some(service),
incoming: Some(incoming),
abort_registration,
}
}
}
pub struct UnboundedBiStreamingChannel<T> {
in_stream: Streaming<T>,
}
impl<T> UnboundedBiStreamingChannel<T> {
pub fn new(in_stream: Streaming<T>) -> Self {
Self { in_stream }
}
}
impl<T> Channel for UnboundedBiStreamingChannel<T> {
type Event = ();
type Handle = UnboundedBiStreamingHandle;
type Inbox = BiStreamingInbox<T>;
type Metric = prometheus::IntGauge;
fn channel<TT>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let streaming = self.in_stream;
(
UnboundedBiStreamingHandle::new(abort_handle, scope_id),
BiStreamingInbox::new(streaming, abort_registration.clone()),
abort_registration,
None,
None,
)
}
}
#[derive(Clone)]
pub struct UnboundedBiStreamingHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl UnboundedBiStreamingHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for UnboundedBiStreamingHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
pub struct BoundedBiStreamingChannel<T> {
in_stream: Streaming<T>,
}
impl<T> BoundedBiStreamingChannel<T> {
pub fn new(in_stream: Streaming<T>) -> Self {
Self { in_stream }
}
}
impl<T> Channel for BoundedBiStreamingChannel<T> {
type Event = ();
type Handle = BoundedBiStreamingHandle;
type Inbox = BiStreamingInbox<T>;
type Metric = prometheus::IntGauge;
fn channel<TT>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let streaming = self.in_stream;
(
BoundedBiStreamingHandle::new(abort_handle, scope_id),
BiStreamingInbox::new(streaming, abort_registration.clone()),
abort_registration,
None,
None,
)
}
}
#[derive(Clone)]
pub struct BoundedBiStreamingHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl BoundedBiStreamingHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for BoundedBiStreamingHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
pub struct BiStreamingInbox<T> {
streaming: Abortable<tonic::Streaming<T>>,
}
unsafe impl<T> Sync for BiStreamingInbox<T> {}
impl<T> BiStreamingInbox<T> {
pub fn new(streaming: tonic::Streaming<T>, abort_registration: AbortRegistration) -> Self {
let streaming = Abortable::new(streaming, abort_registration);
Self { streaming }
}
pub fn in_stream(&mut self) -> &mut tonic::Streaming<T> {
&mut self.streaming
}
}
}
#[cfg(feature = "tonicserver")]
pub use self::tonic_channels::*;
#[cfg(feature = "rocket")]
mod rocket_channels {
use super::*;
pub use ::rocket;
use ::rocket::Ignite;
impl Channel for ::rocket::Rocket<Ignite> {
type Event = ();
type Handle = RocketHandle;
type Inbox = RocketInbox;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let rocket_shutdown = self.shutdown();
let rocket_handle = RocketHandle::new(rocket_shutdown, abort_handle, scope_id);
let rocket_inbox = RocketInbox::new(self);
(rocket_handle, rocket_inbox, abort_registration, None, None)
}
}
#[derive(Clone)]
pub struct RocketHandle {
abort_handle: AbortHandle,
rocket_shutdown: ::rocket::Shutdown,
scope_id: ScopeId,
}
impl RocketHandle {
pub fn new(rocket_shutdown: ::rocket::Shutdown, abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self {
abort_handle,
rocket_shutdown,
scope_id,
}
}
}
#[async_trait::async_trait]
impl Shutdown for RocketHandle {
async fn shutdown(&self) {
self.rocket_shutdown.clone().notify();
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
pub struct RocketInbox {
server: Option<::rocket::Rocket<::rocket::Ignite>>,
}
impl RocketInbox {
pub fn new(server: ::rocket::Rocket<::rocket::Ignite>) -> Self {
Self { server: Some(server) }
}
pub fn rocket(&mut self) -> Option<::rocket::Rocket<::rocket::Ignite>> {
self.server.take()
}
}
}
#[cfg(feature = "rocket")]
pub use self::rocket_channels::*;
#[cfg(feature = "paho-mqtt")]
mod paho_mqtt_channels {
use super::*;
pub use paho_mqtt;
use paho_mqtt::{AsyncClient, AsyncReceiver, Message};
pub struct MqttChannel {
async_client: AsyncClient,
stream: AsyncReceiver<Option<Message>>,
}
impl MqttChannel {
pub fn new(async_client: AsyncClient, stream: AsyncReceiver<Option<Message>>) -> Self {
Self { async_client, stream }
}
}
impl Channel for MqttChannel {
type Event = ();
type Handle = MqttHandle;
type Inbox = MqttInbox;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let inbox = MqttInbox::new(self.async_client, self.stream, abort_registration.clone());
let handle = MqttHandle::new(abort_handle, scope_id);
(handle, inbox, abort_registration, None, None)
}
}
#[derive(Clone)]
pub struct MqttHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl MqttHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for MqttHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
pub struct MqttInbox {
async_client: AsyncClient,
stream: Abortable<AsyncReceiver<Option<Message>>>,
abort_registration: AbortRegistration,
}
impl MqttInbox {
pub fn new(
async_client: AsyncClient,
stream: AsyncReceiver<Option<Message>>,
abort_registration: AbortRegistration,
) -> Self {
Self {
async_client,
stream: Abortable::new(stream, abort_registration.clone()),
abort_registration,
}
}
pub async fn reconnect_after<D: Into<std::time::Duration>>(
&mut self,
duration: D,
) -> ActorResult<::paho_mqtt::Result<::paho_mqtt::ServerResponse>> {
Abortable::new(tokio::time::sleep(duration.into()), self.abort_registration.clone())
.await
.map_err(|e| ActorError::aborted_msg(format!("mqtt aborted while reconnecting: {}", e)))?;
self.reconnect().await
}
pub async fn reconnect(&mut self) -> ActorResult<::paho_mqtt::Result<::paho_mqtt::ServerResponse>> {
let reconnect_fut = async { self.async_client.reconnect().await };
Abortable::new(reconnect_fut, self.abort_registration.clone())
.await
.map_err(|e| ActorError::aborted_msg(format!("mqtt aborted while reconnecting: {}", e)))
}
pub async fn subscribe<S>(
&self,
topic: S,
qos: i32,
) -> ActorResult<::paho_mqtt::Result<::paho_mqtt::ServerResponse>>
where
S: Into<String>,
{
let subscribe_fut = async { self.async_client.subscribe(topic, qos).await };
Abortable::new(subscribe_fut, self.abort_registration.clone())
.await
.map_err(|e| ActorError::aborted_msg(format!("mqtt aborted while subscribing: {}", e)))
}
pub fn stream(&mut self) -> &mut Abortable<AsyncReceiver<Option<Message>>> {
&mut self.stream
}
}
}
#[cfg(feature = "paho-mqtt")]
pub use self::paho_mqtt_channels::*;
#[cfg(feature = "axumserver")]
mod axum_channels {
use super::*;
use ::hyper::server::accept::Accept;
pub struct AxumChannel<T = std::net::SocketAddr>
where
T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>,
{
router: axum::Router,
builder: hyper::server::Builder<::hyper::server::conn::AddrIncoming>,
_marker: std::marker::PhantomData<fn() -> T>,
}
impl<T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> AxumChannel<T> {
pub fn new(router: axum::Router, builder: hyper::server::Builder<::hyper::server::conn::AddrIncoming>) -> Self {
Self {
router,
builder,
_marker: std::marker::PhantomData,
}
}
}
unsafe impl<T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> Sync
for AxumInbox<T>
{
}
impl<C: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> Channel
for AxumChannel<C>
{
type Event = ();
type Handle = AxumHandle;
type Inbox = AxumInbox<C>;
type Metric = prometheus::IntGauge;
fn channel<T>(
self,
scope_id: ScopeId,
) -> (
Self::Handle,
Self::Inbox,
AbortRegistration,
Option<prometheus::IntGauge>,
Option<Box<dyn Route<()>>>,
) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let tonic_handle = AxumHandle::new(abort_handle, scope_id);
let tonic_inbox = AxumInbox::new(self.router, self.builder, abort_registration.clone());
(tonic_handle, tonic_inbox, abort_registration, None, None)
}
}
#[derive(Clone)]
pub struct AxumHandle {
abort_handle: AbortHandle,
scope_id: ScopeId,
}
impl AxumHandle {
pub fn new(abort_handle: AbortHandle, scope_id: ScopeId) -> Self {
Self { abort_handle, scope_id }
}
}
#[async_trait::async_trait]
impl Shutdown for AxumHandle {
async fn shutdown(&self) {
self.abort_handle.abort();
}
fn scope_id(&self) -> ScopeId {
self.scope_id
}
}
struct TcpIncoming(::hyper::server::conn::AddrIncoming);
impl tokio_stream::Stream for TcpIncoming {
type Item = Result<::hyper::server::conn::AddrStream, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_accept(cx)
}
}
pub struct AxumInbox<T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> {
router: Option<axum::Router>,
builder: Option<hyper::server::Builder<::hyper::server::conn::AddrIncoming>>,
abort_registration: AbortRegistration,
_marker: std::marker::PhantomData<fn() -> T>,
}
impl<T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> AxumInbox<T> {
pub async fn ignite(&mut self) -> Result<(), ::hyper::Error> {
if let Some(builder) = self.builder.take() {
let f = futures::future::pending::<()>();
let abortable = Abortable::new(f, self.abort_registration.clone());
if let Some(server) = self.router.take() {
builder
.serve(server.into_make_service_with_connect_info::<T>())
.with_graceful_shutdown(async {
abortable.await.ok();
})
.await?;
}
}
Ok(())
}
}
impl<T: for<'a> axum::extract::connect_info::Connected<&'a ::hyper::server::conn::AddrStream>> AxumInbox<T> {
pub fn new(
router: axum::Router,
builder: hyper::server::Builder<hyper::server::conn::AddrIncoming>,
abort_registration: AbortRegistration,
) -> Self {
Self {
router: Some(router),
builder: Some(builder),
abort_registration,
_marker: std::marker::PhantomData,
}
}
}
}
#[cfg(feature = "axum")]
pub use axum_channels::*;