use crate::{types::Request, RpcSend, TaskSet};
use serde_json::value::RawValue;
use std::future::Future;
use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle};
use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::error;
#[derive(thiserror::Error, Debug)]
pub enum NotifyError {
#[error("failed to serialize notification: {0}")]
Serde(#[from] serde_json::Error),
#[error("notification channel closed")]
Send(#[from] mpsc::error::SendError<Box<RawValue>>),
}
#[derive(Debug, Clone, Default)]
pub struct HandlerCtx {
pub(crate) notifications: Option<mpsc::Sender<Box<RawValue>>>,
pub(crate) tasks: TaskSet,
}
impl From<TaskSet> for HandlerCtx {
fn from(tasks: TaskSet) -> Self {
Self {
notifications: None,
tasks,
}
}
}
impl From<Handle> for HandlerCtx {
fn from(handle: Handle) -> Self {
Self {
notifications: None,
tasks: handle.into(),
}
}
}
impl HandlerCtx {
#[allow(dead_code)] pub(crate) const fn new(
notifications: Option<mpsc::Sender<Box<RawValue>>>,
tasks: TaskSet,
) -> Self {
Self {
notifications,
tasks,
}
}
pub const fn notifications(&self) -> Option<&mpsc::Sender<Box<RawValue>>> {
self.notifications.as_ref()
}
pub fn notifications_enabled(&self) -> bool {
self.notifications
.as_ref()
.map(|tx| !tx.is_closed())
.unwrap_or_default()
}
pub async fn notify<T: RpcSend>(&self, t: &T) -> Result<(), NotifyError> {
if let Some(notifications) = self.notifications.as_ref() {
let rv = serde_json::value::to_raw_value(t)?;
notifications.send(rv).await?;
}
Ok(())
}
pub fn spawn<F>(&self, f: F) -> JoinHandle<Option<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.tasks.spawn_cancellable(f)
}
pub fn spawn_with_ctx<F, Fut>(&self, f: F) -> JoinHandle<Option<Fut::Output>>
where
F: FnOnce(HandlerCtx) -> Fut,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.tasks.spawn_cancellable(f(self.clone()))
}
pub fn spawn_blocking<F>(&self, f: F) -> JoinHandle<Option<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.tasks.spawn_blocking_cancellable(f)
}
pub fn spawn_blocking_with_ctx<F, Fut>(&self, f: F) -> JoinHandle<Option<Fut::Output>>
where
F: FnOnce(HandlerCtx) -> Fut,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.tasks.spawn_blocking_cancellable(f(self.clone()))
}
pub fn spawn_graceful<F, Fut>(&self, f: F) -> JoinHandle<Fut::Output>
where
F: FnOnce(WaitForCancellationFutureOwned) -> Fut + Send + 'static,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.tasks.spawn_graceful(f)
}
pub fn spawn_graceful_with_ctx<F, Fut>(&self, f: F) -> JoinHandle<Fut::Output>
where
F: FnOnce(HandlerCtx, WaitForCancellationFutureOwned) -> Fut + Send + 'static,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let ctx = self.clone();
self.tasks.spawn_graceful(move |token| f(ctx, token))
}
pub fn spawn_blocking_graceful<F, Fut>(&self, f: F) -> JoinHandle<Fut::Output>
where
F: FnOnce(WaitForCancellationFutureOwned) -> Fut + Send + 'static,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
self.tasks.spawn_blocking_graceful(f)
}
pub fn spawn_blocking_graceful_with_ctx<F, Fut>(&self, f: F) -> JoinHandle<Fut::Output>
where
F: FnOnce(HandlerCtx, WaitForCancellationFutureOwned) -> Fut + Send + 'static,
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let ctx = self.clone();
self.tasks
.spawn_blocking_graceful(move |token| f(ctx, token))
}
}
#[derive(Debug, Clone)]
pub struct HandlerArgs {
pub(crate) ctx: HandlerCtx,
pub(crate) req: Request,
}
impl HandlerArgs {
pub const fn new(ctx: HandlerCtx, req: Request) -> Self {
Self { ctx, req }
}
pub const fn ctx(&self) -> &HandlerCtx {
&self.ctx
}
pub const fn req(&self) -> &Request {
&self.req
}
}