use serde::de::DeserializeOwned;
use serde::Serialize;
use std::{
any::{Any, TypeId},
collections::HashMap,
fmt,
future::Future,
ops::Deref,
pin::Pin,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub type SharedStateMap = Arc<RwLock<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>>;
pub fn resolve_state<S: Send + Sync + 'static>(
states: &SharedStateMap,
) -> Result<Arc<S>, String> {
let map = states.read().map_err(|e| format!("State lock poisoned: {e}"))?;
let any = map
.get(&TypeId::of::<S>())
.ok_or_else(|| {
format!(
"State not found: {}. Was with_state::<{0}>() or inject_state::<{0}>() called?",
std::any::type_name::<S>()
)
})?
.clone();
any.downcast::<S>().map_err(|_| {
format!("State type mismatch: expected {}", std::any::type_name::<S>())
})
}
pub fn resolve_state_erased(
states: &SharedStateMap,
type_id: TypeId,
type_name: &str,
) -> Result<Arc<dyn Any + Send + Sync>, String> {
let map = states
.read()
.map_err(|e| format!("State lock poisoned: {e}"))?;
map.get(&type_id).cloned().ok_or_else(|| {
format!(
"State not found: {type_name}. Was with_state::<{type_name}>() or inject_state::<{type_name}>() called?"
)
})
}
pub trait IntoHandlerResult: Send {
fn into_handler_result(self) -> Result<String, String>;
}
impl IntoHandlerResult for String {
fn into_handler_result(self) -> Result<String, String> {
Ok(self)
}
}
pub struct Json<T>(pub T);
impl<T: Serialize + Send> IntoHandlerResult for Json<T> {
fn into_handler_result(self) -> Result<String, String> {
serde_json::to_string(&self.0)
.map_err(|e| format!("Failed to serialize response: {e}"))
}
}
impl<T: Serialize + Send, E: fmt::Display + Send> IntoHandlerResult for Result<T, E> {
fn into_handler_result(self) -> Result<String, String> {
match self {
Ok(value) => serde_json::to_string(&value)
.map_err(|e| format!("Failed to serialize response: {e}")),
Err(e) => Err(e.to_string()),
}
}
}
pub trait IntoStreamItem: Send {
fn into_stream_item(self) -> Result<String, String>;
}
impl IntoStreamItem for String {
fn into_stream_item(self) -> Result<String, String> {
Ok(self)
}
}
impl<T: Serialize + Send> IntoStreamItem for Json<T> {
fn into_stream_item(self) -> Result<String, String> {
serde_json::to_string(&self.0)
.map_err(|e| format!("Failed to serialize stream item: {e}"))
}
}
impl<T: Serialize + Send, E: fmt::Display + Send> IntoStreamItem for Result<T, E> {
fn into_stream_item(self) -> Result<String, String> {
match self {
Ok(value) => serde_json::to_string(&value)
.map_err(|e| format!("Failed to serialize stream item: {e}")),
Err(e) => Err(e.to_string()),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum StreamError {
Closed,
Serialize(String),
}
impl fmt::Display for StreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StreamError::Closed => write!(f, "stream closed: receiver dropped"),
StreamError::Serialize(e) => write!(f, "stream serialization error: {e}"),
}
}
}
impl std::error::Error for StreamError {}
pub const DEFAULT_STREAM_CAPACITY: usize = 64;
#[derive(Clone)]
pub struct StreamSender {
tx: mpsc::Sender<String>,
cancel: CancellationToken,
}
pub struct StreamReceiver {
rx: mpsc::Receiver<String>,
cancel: CancellationToken,
}
impl StreamReceiver {
pub async fn recv(&mut self) -> Option<String> {
self.rx.recv().await
}
}
impl Drop for StreamReceiver {
fn drop(&mut self) {
self.cancel.cancel();
}
}
impl fmt::Debug for StreamReceiver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamReceiver")
.field("cancelled", &self.cancel.is_cancelled())
.finish()
}
}
impl StreamSender {
pub fn channel() -> (Self, StreamReceiver) {
Self::with_capacity(DEFAULT_STREAM_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> (Self, StreamReceiver) {
let (tx, rx) = mpsc::channel(capacity);
let cancel = CancellationToken::new();
(
Self { tx, cancel: cancel.clone() },
StreamReceiver { rx, cancel },
)
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel.clone()
}
pub fn cancel(&self) {
self.cancel.cancel();
}
pub async fn send(&self, item: impl IntoStreamItem) -> Result<(), StreamError> {
let serialized = item.into_stream_item().map_err(StreamError::Serialize)?;
self.tx
.send(serialized)
.await
.map_err(|_| StreamError::Closed)
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
impl fmt::Debug for StreamSender {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamSender")
.field("closed", &self.is_closed())
.field("cancelled", &self.cancel.is_cancelled())
.finish()
}
}
pub trait Handler: Send + Sync {
fn call(&self, args: &str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>>;
}
#[derive(Debug, Clone)]
pub struct State<S>(pub S);
impl<S> Deref for State<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct HandlerFn<F, Fut, R>
where
F: Fn() -> Fut + Send + Sync,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
_marker: std::marker::PhantomData<fn() -> R>,
}
impl<F, Fut, R> HandlerFn<F, Fut, R>
where
F: Fn() -> Fut + Send + Sync,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F) -> Self {
Self {
func,
_marker: std::marker::PhantomData,
}
}
}
impl<F, Fut, R> Handler for HandlerFn<F, Fut, R>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call(&self, _args: &str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let fut = (self.func)();
Box::pin(async move { fut.await.into_handler_result() })
}
}
#[allow(clippy::type_complexity)]
pub struct HandlerWithArgs<F, T, Fut, R>
where
F: Fn(T) -> Fut + Send + Sync,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
_marker: std::marker::PhantomData<(fn() -> T, fn() -> R)>,
}
impl<F, T, Fut, R> HandlerWithArgs<F, T, Fut, R>
where
F: Fn(T) -> Fut + Send + Sync,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F) -> Self {
Self {
func,
_marker: std::marker::PhantomData,
}
}
}
impl<F, T, Fut, R> Handler for HandlerWithArgs<F, T, Fut, R>
where
F: Fn(T) -> Fut + Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call(&self, args: &str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = (self.func)(value);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}
}
#[allow(clippy::type_complexity)]
pub struct HandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T) -> Fut + Send + Sync,
S: Send + Sync + 'static,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
states: SharedStateMap,
_marker: std::marker::PhantomData<(fn() -> S, fn() -> T, fn() -> R)>,
}
impl<F, S, T, Fut, R> HandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T) -> Fut + Send + Sync,
S: Send + Sync + 'static,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F, states: SharedStateMap) -> Self {
Self {
func,
states,
_marker: std::marker::PhantomData,
}
}
}
impl<F, S, T, Fut, R> Handler for HandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call(&self, args: &str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let state_arc = match resolve_state::<S>(&self.states) {
Ok(s) => s,
Err(msg) => return Box::pin(async move { Err(msg) }),
};
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = (self.func)(State(state_arc), value);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}
}
#[allow(clippy::type_complexity)]
pub struct HandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>) -> Fut + Send + Sync,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
states: SharedStateMap,
_marker: std::marker::PhantomData<(fn() -> S, fn() -> R)>,
}
impl<F, S, Fut, R> HandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>) -> Fut + Send + Sync,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F, states: SharedStateMap) -> Self {
Self {
func,
states,
_marker: std::marker::PhantomData,
}
}
}
impl<F, S, Fut, R> Handler for HandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call(&self, _args: &str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let state_arc = match resolve_state::<S>(&self.states) {
Ok(s) => s,
Err(msg) => return Box::pin(async move { Err(msg) }),
};
let fut = (self.func)(State(state_arc));
Box::pin(async move { fut.await.into_handler_result() })
}
}
pub type HandlerCallFn =
dyn Fn(&str) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send>> + Send + Sync;
pub struct ErasedHandler(pub(crate) Box<HandlerCallFn>);
impl ErasedHandler {
pub fn from_closure(f: Box<HandlerCallFn>) -> Self {
Self(f)
}
pub fn no_args<F, Fut, R>(handler: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |_args: &str| {
let fut = handler();
Box::pin(async move { fut.await.into_handler_result() })
}))
}
pub fn with_args<F, T, Fut, R>(handler: F) -> Self
where
F: Fn(T) -> Fut + Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |args: &str| {
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = handler(value);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}))
}
pub fn with_state<F, S, T, Fut, R>(handler: F, states: SharedStateMap) -> Self
where
F: Fn(State<Arc<S>>, T) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |args: &str| {
let state_arc = match resolve_state::<S>(&states) {
Ok(s) => s,
Err(msg) => {
return Box::pin(async move { Err(msg) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}
};
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = handler(State(state_arc), value);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}))
}
pub fn with_state_only<F, S, Fut, R>(handler: F, states: SharedStateMap) -> Self
where
F: Fn(State<Arc<S>>) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |_args: &str| {
let state_arc = match resolve_state::<S>(&states) {
Ok(s) => s,
Err(msg) => {
return Box::pin(async move { Err(msg) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}
};
let fut = handler(State(state_arc));
Box::pin(async move { fut.await.into_handler_result() })
}))
}
}
impl Handler for ErasedHandler {
fn call(
&self,
args: &str,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
(self.0)(args)
}
}
pub trait StreamHandler: Send + Sync {
fn call_streaming(
&self,
args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>>;
}
pub type StreamHandlerCallFn = dyn Fn(&str, StreamSender) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
+ Send
+ Sync;
pub struct ErasedStreamHandler(pub(crate) Box<StreamHandlerCallFn>);
impl ErasedStreamHandler {
pub fn from_closure(f: Box<StreamHandlerCallFn>) -> Self {
Self(f)
}
pub fn no_args<F, Fut, R>(handler: F) -> Self
where
F: Fn(StreamSender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |_args: &str, tx: StreamSender| {
let fut = handler(tx);
Box::pin(async move { fut.await.into_handler_result() })
}))
}
pub fn with_args<F, T, Fut, R>(handler: F) -> Self
where
F: Fn(T, StreamSender) -> Fut + Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |args: &str, tx: StreamSender| {
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = handler(value, tx);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}))
}
pub fn with_state<F, S, T, Fut, R>(handler: F, states: SharedStateMap) -> Self
where
F: Fn(State<Arc<S>>, T, StreamSender) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |args: &str, tx: StreamSender| {
let state_arc = match resolve_state::<S>(&states) {
Ok(s) => s,
Err(msg) => {
return Box::pin(async move { Err(msg) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}
};
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = handler(State(state_arc), value, tx);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}))
}
pub fn with_state_only<F, S, Fut, R>(handler: F, states: SharedStateMap) -> Self
where
F: Fn(State<Arc<S>>, StreamSender) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
Self(Box::new(move |_args: &str, tx: StreamSender| {
let state_arc = match resolve_state::<S>(&states) {
Ok(s) => s,
Err(msg) => {
return Box::pin(async move { Err(msg) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}
};
let fut = handler(State(state_arc), tx);
Box::pin(async move { fut.await.into_handler_result() })
}))
}
}
impl StreamHandler for ErasedStreamHandler {
fn call_streaming(
&self,
args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
(self.0)(args, tx)
}
}
pub struct StreamingHandlerFn<F, Fut, R>
where
F: Fn(StreamSender) -> Fut + Send + Sync,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
_marker: std::marker::PhantomData<fn() -> R>,
}
impl<F, Fut, R> StreamingHandlerFn<F, Fut, R>
where
F: Fn(StreamSender) -> Fut + Send + Sync,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F) -> Self {
Self {
func,
_marker: std::marker::PhantomData,
}
}
}
impl<F, Fut, R> StreamHandler for StreamingHandlerFn<F, Fut, R>
where
F: Fn(StreamSender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call_streaming(
&self,
_args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let fut = (self.func)(tx);
Box::pin(async move { fut.await.into_handler_result() })
}
}
#[allow(clippy::type_complexity)]
pub struct StreamingHandlerWithArgs<F, T, Fut, R>
where
F: Fn(T, StreamSender) -> Fut + Send + Sync,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
_marker: std::marker::PhantomData<(fn() -> T, fn() -> R)>,
}
impl<F, T, Fut, R> StreamingHandlerWithArgs<F, T, Fut, R>
where
F: Fn(T, StreamSender) -> Fut + Send + Sync,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F) -> Self {
Self {
func,
_marker: std::marker::PhantomData,
}
}
}
impl<F, T, Fut, R> StreamHandler for StreamingHandlerWithArgs<F, T, Fut, R>
where
F: Fn(T, StreamSender) -> Fut + Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call_streaming(
&self,
args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = (self.func)(value, tx);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}
}
#[allow(clippy::type_complexity)]
pub struct StreamingHandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T, StreamSender) -> Fut + Send + Sync,
S: Send + Sync + 'static,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
states: SharedStateMap,
_marker: std::marker::PhantomData<(fn() -> S, fn() -> T, fn() -> R)>,
}
impl<F, S, T, Fut, R> StreamingHandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T, StreamSender) -> Fut + Send + Sync,
S: Send + Sync + 'static,
T: DeserializeOwned + Send,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F, states: SharedStateMap) -> Self {
Self {
func,
states,
_marker: std::marker::PhantomData,
}
}
}
impl<F, S, T, Fut, R> StreamHandler for StreamingHandlerWithState<F, S, T, Fut, R>
where
F: Fn(State<Arc<S>>, T, StreamSender) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
T: DeserializeOwned + Send + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call_streaming(
&self,
args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let state_arc = match resolve_state::<S>(&self.states) {
Ok(s) => s,
Err(msg) => return Box::pin(async move { Err(msg) }),
};
let parsed: Result<T, _> = serde_json::from_str(args);
match parsed {
Ok(value) => {
let fut = (self.func)(State(state_arc), value, tx);
Box::pin(async move { fut.await.into_handler_result() })
}
Err(e) => Box::pin(async move {
Err(format!("Failed to deserialize args: {e}"))
}),
}
}
}
#[allow(clippy::type_complexity)]
pub struct StreamingHandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>, StreamSender) -> Fut + Send + Sync,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
func: F,
states: SharedStateMap,
_marker: std::marker::PhantomData<(fn() -> S, fn() -> R)>,
}
impl<F, S, Fut, R> StreamingHandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>, StreamSender) -> Fut + Send + Sync,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send,
R: IntoHandlerResult,
{
pub fn new(func: F, states: SharedStateMap) -> Self {
Self {
func,
states,
_marker: std::marker::PhantomData,
}
}
}
impl<F, S, Fut, R> StreamHandler for StreamingHandlerWithStateOnly<F, S, Fut, R>
where
F: Fn(State<Arc<S>>, StreamSender) -> Fut + Send + Sync + 'static,
S: Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResult + 'static,
{
fn call_streaming(
&self,
_args: &str,
tx: StreamSender,
) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
let state_arc = match resolve_state::<S>(&self.states) {
Ok(s) => s,
Err(msg) => return Box::pin(async move { Err(msg) }),
};
let fut = (self.func)(State(state_arc), tx);
Box::pin(async move { fut.await.into_handler_result() })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn state_map<S: Send + Sync + 'static>(value: S) -> SharedStateMap {
let mut map = HashMap::new();
map.insert(TypeId::of::<S>(), Arc::new(value) as Arc<dyn Any + Send + Sync>);
Arc::new(RwLock::new(map))
}
#[tokio::test]
async fn test_handler_fn() {
let handler = HandlerFn::new(|| async { "test".to_string() });
let result = handler.call("{}").await;
assert_eq!(result, Ok("test".to_string()));
}
#[tokio::test]
async fn test_handler_fn_ignores_args() {
let handler = HandlerFn::new(|| async { "no-args".to_string() });
let result = handler.call(r#"{"unexpected": true}"#).await;
assert_eq!(result, Ok("no-args".to_string()));
}
#[tokio::test]
async fn test_handler_with_args() {
#[derive(serde::Deserialize)]
struct Input {
name: String,
}
let handler = HandlerWithArgs::new(|args: Input| async move {
format!("hello {}", args.name)
});
let result = handler.call(r#"{"name":"Alice"}"#).await;
assert_eq!(result, Ok("hello Alice".to_string()));
}
#[tokio::test]
async fn test_handler_with_args_bad_json() {
#[derive(serde::Deserialize)]
struct Input {
_name: String,
}
let handler = HandlerWithArgs::new(|_args: Input| async move {
"unreachable".to_string()
});
let result = handler.call("not-json").await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to deserialize args"));
}
#[tokio::test]
async fn test_handler_with_args_missing_field() {
#[derive(serde::Deserialize)]
struct Input {
_name: String,
}
let handler = HandlerWithArgs::new(|_args: Input| async move {
"unreachable".to_string()
});
let result = handler.call(r#"{"age": 30}"#).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to deserialize args"));
}
#[tokio::test]
async fn test_handler_with_state() {
struct AppState {
greeting: String,
}
#[derive(serde::Deserialize)]
struct Input {
name: String,
}
let states = state_map(AppState {
greeting: "Hi".to_string(),
});
let handler = HandlerWithState::new(
|state: State<Arc<AppState>>, args: Input| async move {
format!("{} {}", state.greeting, args.name)
},
states,
);
let result = handler.call(r#"{"name":"Bob"}"#).await;
assert_eq!(result, Ok("Hi Bob".to_string()));
}
#[tokio::test]
async fn test_handler_with_state_only() {
struct AppState {
value: i32,
}
let states = state_map(AppState { value: 42 });
let handler = HandlerWithStateOnly::new(
|state: State<Arc<AppState>>| async move {
format!("value={}", state.value)
},
states,
);
let result = handler.call("{}").await;
assert_eq!(result, Ok("value=42".to_string()));
}
#[tokio::test]
async fn test_handler_with_state_deser_error() {
struct AppState;
#[derive(serde::Deserialize)]
struct Input {
_x: i32,
}
let states = state_map(AppState);
let handler = HandlerWithState::new(
|_state: State<Arc<AppState>>, _args: Input| async move {
"unreachable".to_string()
},
states,
);
let result = handler.call("bad").await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to deserialize args"));
}
#[tokio::test]
async fn test_json_handler_fn_struct() {
#[derive(serde::Serialize)]
struct User {
id: u32,
name: String,
}
let handler = HandlerFn::new(|| async {
Json(User {
id: 1,
name: "Alice".to_string(),
})
});
let result = handler.call("{}").await;
assert_eq!(result, Ok(r#"{"id":1,"name":"Alice"}"#.to_string()));
}
#[tokio::test]
async fn test_json_handler_fn_vec() {
let handler = HandlerFn::new(|| async { Json(vec![1, 2, 3]) });
let result = handler.call("{}").await;
assert_eq!(result, Ok("[1,2,3]".to_string()));
}
#[tokio::test]
async fn test_json_handler_with_args() {
#[derive(serde::Deserialize)]
struct Input {
name: String,
}
#[derive(serde::Serialize)]
struct Output {
greeting: String,
}
let handler = HandlerWithArgs::new(|args: Input| async move {
Json(Output {
greeting: format!("Hello {}", args.name),
})
});
let result = handler.call(r#"{"name":"Bob"}"#).await;
assert_eq!(result, Ok(r#"{"greeting":"Hello Bob"}"#.to_string()));
}
#[tokio::test]
async fn test_json_handler_with_args_bad_json() {
#[derive(serde::Deserialize)]
struct Input {
_x: i32,
}
let handler = HandlerWithArgs::new(|_: Input| async move { Json(42) });
let result = handler.call("bad").await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to deserialize args"));
}
#[tokio::test]
async fn test_json_handler_with_state() {
struct AppState {
prefix: String,
}
#[derive(serde::Deserialize)]
struct Input {
name: String,
}
#[derive(serde::Serialize)]
struct Output {
message: String,
}
let states = state_map(AppState {
prefix: "Hi".to_string(),
});
let handler = HandlerWithState::new(
|state: State<Arc<AppState>>, args: Input| async move {
Json(Output {
message: format!("{} {}", state.prefix, args.name),
})
},
states,
);
let result = handler.call(r#"{"name":"Charlie"}"#).await;
assert_eq!(result, Ok(r#"{"message":"Hi Charlie"}"#.to_string()));
}
#[tokio::test]
async fn test_json_handler_with_state_only() {
struct AppState {
version: String,
}
#[derive(serde::Serialize)]
struct Info {
version: String,
}
let states = state_map(AppState {
version: "1.0".to_string(),
});
let handler = HandlerWithStateOnly::new(
|state: State<Arc<AppState>>| async move {
Json(Info {
version: state.version.clone(),
})
},
states,
);
let result = handler.call("{}").await;
assert_eq!(result, Ok(r#"{"version":"1.0"}"#.to_string()));
}
#[tokio::test]
async fn test_result_handler_fn_ok() {
#[derive(serde::Serialize)]
struct Data {
value: i32,
}
let handler = HandlerFn::new(|| async {
Ok::<_, String>(Data { value: 42 })
});
let result = handler.call("{}").await;
assert_eq!(result, Ok(r#"{"value":42}"#.to_string()));
}
#[tokio::test]
async fn test_result_handler_fn_err() {
#[derive(serde::Serialize)]
struct Data {
value: i32,
}
let handler = HandlerFn::new(|| async {
Err::<Data, String>("something went wrong".to_string())
});
let result = handler.call("{}").await;
assert_eq!(result, Err("something went wrong".to_string()));
}
#[tokio::test]
async fn test_result_handler_with_args_ok() {
#[derive(serde::Deserialize)]
struct Input {
x: i32,
}
#[derive(serde::Serialize)]
struct Output {
doubled: i32,
}
let handler = HandlerWithArgs::new(|args: Input| async move {
Ok::<_, String>(Output { doubled: args.x * 2 })
});
let result = handler.call(r#"{"x":21}"#).await;
assert_eq!(result, Ok(r#"{"doubled":42}"#.to_string()));
}
#[tokio::test]
async fn test_result_handler_with_args_err() {
#[derive(serde::Deserialize)]
struct Input {
x: i32,
}
let handler = HandlerWithArgs::new(|args: Input| async move {
if args.x < 0 {
Err::<i32, String>("negative".to_string())
} else {
Ok(args.x)
}
});
let result = handler.call(r#"{"x":-1}"#).await;
assert_eq!(result, Err("negative".to_string()));
}
#[tokio::test]
async fn test_result_handler_with_state() {
struct AppState {
threshold: i32,
}
#[derive(serde::Deserialize)]
struct Input {
value: i32,
}
#[derive(serde::Serialize)]
struct Output {
accepted: bool,
}
let states = state_map(AppState { threshold: 10 });
let handler = HandlerWithState::new(
|state: State<Arc<AppState>>, args: Input| async move {
if args.value >= state.threshold {
Ok::<_, String>(Output { accepted: true })
} else {
Err("below threshold".to_string())
}
},
states,
);
let ok_result = handler.call(r#"{"value":15}"#).await;
assert_eq!(ok_result, Ok(r#"{"accepted":true}"#.to_string()));
let err_result = handler.call(r#"{"value":5}"#).await;
assert_eq!(err_result, Err("below threshold".to_string()));
}
#[tokio::test]
async fn test_result_handler_with_state_only() {
struct AppState {
ready: bool,
}
#[derive(serde::Serialize)]
struct Status {
ok: bool,
}
let states = state_map(AppState { ready: true });
let handler = HandlerWithStateOnly::new(
|state: State<Arc<AppState>>| async move {
if state.ready {
Ok::<_, String>(Status { ok: true })
} else {
Err("not ready".to_string())
}
},
states,
);
let result = handler.call("{}").await;
assert_eq!(result, Ok(r#"{"ok":true}"#.to_string()));
}
#[test]
fn test_into_stream_item_string() {
let item = "hello".to_string();
assert_eq!(item.into_stream_item(), Ok("hello".to_string()));
}
#[test]
fn test_into_stream_item_json() {
#[derive(serde::Serialize)]
struct Token {
text: String,
}
let item = Json(Token {
text: "hi".to_string(),
});
assert_eq!(
item.into_stream_item(),
Ok(r#"{"text":"hi"}"#.to_string())
);
}
#[test]
fn test_into_stream_item_json_vec() {
let item = Json(vec![1, 2, 3]);
assert_eq!(item.into_stream_item(), Ok("[1,2,3]".to_string()));
}
#[test]
fn test_into_stream_item_result_ok() {
#[derive(serde::Serialize)]
struct Data {
v: i32,
}
let item: Result<Data, String> = Ok(Data { v: 42 });
assert_eq!(item.into_stream_item(), Ok(r#"{"v":42}"#.to_string()));
}
#[test]
fn test_into_stream_item_result_err() {
let item: Result<i32, String> = Err("bad".to_string());
assert_eq!(item.into_stream_item(), Err("bad".to_string()));
}
#[test]
fn test_stream_error_display_closed() {
let err = StreamError::Closed;
assert_eq!(err.to_string(), "stream closed: receiver dropped");
}
#[test]
fn test_stream_error_display_serialize() {
let err = StreamError::Serialize("bad json".to_string());
assert_eq!(err.to_string(), "stream serialization error: bad json");
}
#[test]
fn test_stream_error_is_std_error() {
let err: Box<dyn std::error::Error> = Box::new(StreamError::Closed);
assert!(err.to_string().contains("closed"));
}
#[tokio::test]
async fn test_stream_sender_send_and_receive() {
let (tx, mut rx) = StreamSender::channel();
tx.send("hello".to_string()).await.unwrap();
tx.send("world".to_string()).await.unwrap();
drop(tx);
assert_eq!(rx.recv().await, Some("hello".to_string()));
assert_eq!(rx.recv().await, Some("world".to_string()));
assert_eq!(rx.recv().await, None);
}
#[tokio::test]
async fn test_stream_sender_send_json() {
#[derive(serde::Serialize)]
struct Token {
t: String,
}
let (tx, mut rx) = StreamSender::channel();
tx.send(Json(Token {
t: "hi".to_string(),
}))
.await
.unwrap();
drop(tx);
assert_eq!(rx.recv().await, Some(r#"{"t":"hi"}"#.to_string()));
}
#[tokio::test]
async fn test_stream_sender_closed_detection() {
let (tx, rx) = StreamSender::channel();
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
}
#[tokio::test]
async fn test_stream_sender_send_after_close() {
let (tx, rx) = StreamSender::channel();
drop(rx);
let result = tx.send("late".to_string()).await;
assert_eq!(result, Err(StreamError::Closed));
}
#[tokio::test]
async fn test_stream_sender_custom_capacity() {
let (tx, mut rx) = StreamSender::with_capacity(2);
tx.send("a".to_string()).await.unwrap();
tx.send("b".to_string()).await.unwrap();
assert_eq!(rx.recv().await, Some("a".to_string()));
assert_eq!(rx.recv().await, Some("b".to_string()));
tx.send("c".to_string()).await.unwrap();
assert_eq!(rx.recv().await, Some("c".to_string()));
}
#[tokio::test]
async fn test_stream_sender_default_capacity() {
assert_eq!(DEFAULT_STREAM_CAPACITY, 64);
}
#[tokio::test]
async fn test_stream_sender_clone() {
let (tx, mut rx) = StreamSender::channel();
let tx2 = tx.clone();
tx.send("from-tx1".to_string()).await.unwrap();
tx2.send("from-tx2".to_string()).await.unwrap();
drop(tx);
drop(tx2);
assert_eq!(rx.recv().await, Some("from-tx1".to_string()));
assert_eq!(rx.recv().await, Some("from-tx2".to_string()));
assert_eq!(rx.recv().await, None);
}
#[test]
fn test_stream_sender_debug() {
let (tx, _rx) = StreamSender::channel();
let debug = format!("{:?}", tx);
assert!(debug.contains("StreamSender"));
}
#[tokio::test]
async fn test_cancellation_token_not_cancelled_initially() {
let (tx, _rx) = StreamSender::channel();
let token = tx.cancellation_token();
assert!(!token.is_cancelled());
}
#[tokio::test]
async fn test_cancellation_token_cancelled_on_explicit_cancel() {
let (tx, _rx) = StreamSender::channel();
let token = tx.cancellation_token();
assert!(!token.is_cancelled());
tx.cancel();
assert!(token.is_cancelled());
}
#[tokio::test]
async fn test_cancellation_token_cancelled_future_resolves() {
let (tx, _rx) = StreamSender::channel();
let token = tx.cancellation_token();
let tx2 = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
tx2.cancel();
});
tokio::time::timeout(std::time::Duration::from_secs(1), token.cancelled())
.await
.expect("cancelled future should resolve");
}
#[tokio::test]
async fn test_cancellation_token_shared_across_clones() {
let (tx, _rx) = StreamSender::channel();
let token1 = tx.cancellation_token();
let token2 = tx.cancellation_token();
let tx2 = tx.clone();
let token3 = tx2.cancellation_token();
tx.cancel();
assert!(token1.is_cancelled());
assert!(token2.is_cancelled());
assert!(token3.is_cancelled());
}
#[tokio::test]
async fn test_cancellation_token_auto_cancelled_on_receiver_drop() {
let (tx, rx) = StreamSender::channel();
let token = tx.cancellation_token();
assert!(!token.is_cancelled());
drop(rx); assert!(token.is_cancelled());
}
#[tokio::test]
async fn test_cancellation_token_auto_cancel_future_resolves_on_drop() {
let (tx, rx) = StreamSender::channel();
let token = tx.cancellation_token();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
drop(rx);
});
tokio::time::timeout(std::time::Duration::from_secs(1), token.cancelled())
.await
.expect("cancelled future should resolve when receiver is dropped");
}
#[tokio::test]
async fn test_streaming_handler_fn() {
let handler = StreamingHandlerFn::new(|tx: StreamSender| async move {
tx.send("item1".to_string()).await.ok();
tx.send("item2".to_string()).await.ok();
"done".to_string()
});
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert_eq!(result, Ok("done".to_string()));
assert_eq!(rx.recv().await, Some("item1".to_string()));
assert_eq!(rx.recv().await, Some("item2".to_string()));
}
#[tokio::test]
async fn test_streaming_handler_with_args() {
#[derive(serde::Deserialize)]
struct Input {
count: usize,
}
let handler =
StreamingHandlerWithArgs::new(|args: Input, tx: StreamSender| async move {
for i in 0..args.count {
tx.send(format!("item-{i}")).await.ok();
}
format!("sent {}", args.count)
});
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming(r#"{"count":3}"#, tx).await;
assert_eq!(result, Ok("sent 3".to_string()));
assert_eq!(rx.recv().await, Some("item-0".to_string()));
assert_eq!(rx.recv().await, Some("item-1".to_string()));
assert_eq!(rx.recv().await, Some("item-2".to_string()));
}
#[tokio::test]
async fn test_streaming_handler_with_args_bad_json() {
#[derive(serde::Deserialize)]
struct Input {
_x: i32,
}
let handler =
StreamingHandlerWithArgs::new(|_args: Input, _tx: StreamSender| async move {
"unreachable".to_string()
});
let (tx, _rx) = StreamSender::channel();
let result = handler.call_streaming("bad-json", tx).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to deserialize args"));
}
#[tokio::test]
async fn test_streaming_handler_with_state() {
struct AppState {
prefix: String,
}
#[derive(serde::Deserialize)]
struct Input {
name: String,
}
let states = state_map(AppState {
prefix: "Hi".to_string(),
});
let handler = StreamingHandlerWithState::new(
|state: State<Arc<AppState>>, args: Input, tx: StreamSender| async move {
tx.send(format!("{} {}", state.prefix, args.name))
.await
.ok();
"done".to_string()
},
states,
);
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming(r#"{"name":"Alice"}"#, tx).await;
assert_eq!(result, Ok("done".to_string()));
assert_eq!(rx.recv().await, Some("Hi Alice".to_string()));
}
#[tokio::test]
async fn test_streaming_handler_with_state_only() {
struct AppState {
items: Vec<String>,
}
let states = state_map(AppState {
items: vec!["a".to_string(), "b".to_string()],
});
let handler = StreamingHandlerWithStateOnly::new(
|state: State<Arc<AppState>>, tx: StreamSender| async move {
for item in &state.items {
tx.send(item.clone()).await.ok();
}
format!("sent {}", state.items.len())
},
states,
);
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert_eq!(result, Ok("sent 2".to_string()));
assert_eq!(rx.recv().await, Some("a".to_string()));
assert_eq!(rx.recv().await, Some("b".to_string()));
}
#[tokio::test]
async fn test_streaming_handler_with_state_type_mismatch() {
struct WrongState;
struct AppState;
let states = state_map(WrongState);
let handler = StreamingHandlerWithStateOnly::new(
|_state: State<Arc<AppState>>, _tx: StreamSender| async move {
"unreachable".to_string()
},
states,
);
let (tx, _rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("State not found"));
}
#[tokio::test]
async fn test_streaming_handler_json_return() {
#[derive(serde::Serialize)]
struct Summary {
count: usize,
}
let handler = StreamingHandlerFn::new(|tx: StreamSender| async move {
tx.send("item".to_string()).await.ok();
Json(Summary { count: 1 })
});
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert_eq!(result, Ok(r#"{"count":1}"#.to_string()));
assert_eq!(rx.recv().await, Some("item".to_string()));
}
#[tokio::test]
async fn test_streaming_handler_result_return() {
let handler = StreamingHandlerFn::new(|tx: StreamSender| async move {
tx.send("progress".to_string()).await.ok();
Ok::<_, String>(42)
});
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert_eq!(result, Ok("42".to_string()));
assert_eq!(rx.recv().await, Some("progress".to_string()));
}
#[tokio::test]
async fn test_erased_handler_from_closure_no_args() {
let handler = ErasedHandler::from_closure(Box::new(|_args: &str| {
Box::pin(async { Ok("hello".to_string()) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}));
let result = handler.call("{}").await;
assert_eq!(result, Ok("hello".to_string()));
}
#[tokio::test]
async fn test_erased_handler_from_closure_with_args() {
#[derive(serde::Deserialize)]
struct Input { name: String }
let handler = ErasedHandler::from_closure(Box::new(|args: &str| {
let parsed: Result<Input, _> = serde_json::from_str(args);
match parsed {
Ok(input) => {
Box::pin(async move { Ok(format!("hello {}", input.name)) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
}
Err(e) => Box::pin(async move { Err(e.to_string()) })
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>,
}
}));
let result = handler.call(r#"{"name":"Alice"}"#).await;
assert_eq!(result, Ok("hello Alice".to_string()));
}
#[tokio::test]
async fn test_erased_handler_no_args_constructor() {
let handler = ErasedHandler::no_args(|| async { "zero-arg".to_string() });
let result = handler.call("ignored").await;
assert_eq!(result, Ok("zero-arg".to_string()));
}
#[tokio::test]
async fn test_erased_handler_with_args_constructor() {
#[derive(serde::Deserialize)]
struct Input { name: String }
let handler = ErasedHandler::with_args(|input: Input| async move {
format!("hi {}", input.name)
});
let result = handler.call(r#"{"name":"Bob"}"#).await;
assert_eq!(result, Ok("hi Bob".to_string()));
}
#[tokio::test]
async fn test_erased_handler_with_state_constructor() {
#[derive(serde::Deserialize)]
struct Input { #[allow(dead_code)] name: String }
let states = state_map("shared-state".to_string());
let handler =
ErasedHandler::with_state(
|state: State<Arc<String>>, _input: Input| async move {
format!("state={}", *state)
},
states,
);
let result = handler.call(r#"{"name":"x"}"#).await;
assert_eq!(result, Ok("state=shared-state".to_string()));
}
#[tokio::test]
async fn test_erased_handler_with_state_only_constructor() {
let states = state_map(42u32);
let handler =
ErasedHandler::with_state_only(
|state: State<Arc<u32>>| async move { format!("n={}", *state) },
states,
);
let result = handler.call("{}").await;
assert_eq!(result, Ok("n=42".to_string()));
}
#[tokio::test]
async fn test_erased_stream_handler_from_closure() {
let handler = ErasedStreamHandler::from_closure(Box::new(
|_args: &str, tx: StreamSender| {
Box::pin(async move {
tx.send("chunk".to_string()).await.ok();
Ok("done".to_string())
})
as Pin<Box<dyn Future<Output = Result<String, String>> + Send>>
},
));
let (tx, mut rx) = StreamSender::channel();
let result = handler.call_streaming("{}", tx).await;
assert_eq!(result, Ok("done".to_string()));
assert_eq!(rx.recv().await, Some("chunk".to_string()));
}
#[test]
fn test_resolve_state_erased_success() {
let states = state_map(99u64);
let type_id = TypeId::of::<u64>();
let type_name = std::any::type_name::<u64>();
let any = resolve_state_erased(&states, type_id, type_name).unwrap();
let val = any.downcast::<u64>().unwrap();
assert_eq!(*val, 99u64);
}
#[test]
fn test_resolve_state_erased_missing() {
let states: SharedStateMap = Arc::new(RwLock::new(HashMap::new()));
let type_id = TypeId::of::<String>();
let type_name = std::any::type_name::<String>();
let err = resolve_state_erased(&states, type_id, type_name).unwrap_err();
assert!(err.contains("State not found"));
assert!(err.contains(type_name));
}
#[tokio::test]
async fn test_erase_handler_with_state_macro() {
let states = state_map("macro-state".to_string());
async fn handler(
state: State<Arc<String>>,
_args: serde_json::Value,
) -> String {
format!("got={}", *state)
}
let erased = crate::erase_handler_with_state!(handler, String, serde_json::Value, states);
let result = erased.call("{}").await;
assert_eq!(result, Ok("got=macro-state".to_string()));
}
#[tokio::test]
async fn test_erase_handler_with_state_only_macro() {
let states = state_map(7u32);
async fn handler(state: State<Arc<u32>>) -> String {
format!("n={}", *state)
}
let erased = crate::erase_handler_with_state_only!(handler, u32, states);
let result = erased.call("{}").await;
assert_eq!(result, Ok("n=7".to_string()));
}
#[tokio::test]
async fn test_erase_streaming_handler_with_state_only_macro() {
let states = state_map("stream-state".to_string());
async fn handler(
state: State<Arc<String>>,
tx: StreamSender,
) -> String {
tx.send(format!("from={}", *state)).await.ok();
"done".to_string()
}
let erased = crate::erase_streaming_handler_with_state_only!(handler, String, states);
let (tx, mut rx) = StreamSender::channel();
let result = erased.call_streaming("{}", tx).await;
assert_eq!(result, Ok("done".to_string()));
assert_eq!(rx.recv().await, Some("from=stream-state".to_string()));
}
}