use std::any::Any;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use crate::actor::ActorError;
use crate::message::{Headers, RuntimeHeaders};
use crate::node::{ActorId, NodeId};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum SendMode {
Tell,
Ask,
Expand,
Reduce,
Transform,
}
pub struct InboundContext<'a> {
pub actor_id: ActorId,
pub actor_name: &'a str,
pub message_type: &'static str,
pub send_mode: SendMode,
pub remote: bool,
pub origin_node: Option<NodeId>,
}
#[derive(Debug)]
pub enum Disposition {
Continue,
Delay(Duration),
Drop,
Reject(String),
Retry(Duration),
}
pub struct InterceptResult {
pub disposition: Disposition,
pub interceptor_name: &'static str,
}
impl InterceptResult {
pub fn continued() -> Self {
Self {
disposition: Disposition::Continue,
interceptor_name: "",
}
}
pub fn is_continue(&self) -> bool {
matches!(self.disposition, Disposition::Continue)
}
}
#[derive(Debug)]
pub struct DropNotice {
pub target_name: String,
pub message_type: &'static str,
pub interceptor_name: &'static str,
pub send_mode: SendMode,
pub context: &'static str,
pub seq: Option<u64>,
}
pub trait DropObserver: Send + Sync + 'static {
fn on_drop(&self, notice: DropNotice);
}
pub fn notify_drop(observer: &Option<Arc<dyn DropObserver>>, notice: DropNotice) {
if let Some(obs) = observer {
obs.on_drop(notice);
}
}
pub fn intercept_outbound_stream_item(
interceptors: &[Box<dyn OutboundInterceptor>],
ctx: &OutboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> InterceptResult {
for interceptor in interceptors {
let d = interceptor.on_expand_item(ctx, headers, seq, item);
if !matches!(d, Disposition::Continue) {
return InterceptResult {
disposition: d,
interceptor_name: interceptor.name(),
};
}
}
InterceptResult::continued()
}
pub enum Outcome<'a> {
TellSuccess,
AskSuccess {
reply: &'a dyn Any,
},
HandlerError {
error: ActorError,
},
StreamCompleted {
items_emitted: u64,
},
StreamCancelled {
items_emitted: u64,
},
}
impl<'a> std::fmt::Debug for Outcome<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TellSuccess => write!(f, "TellSuccess"),
Self::AskSuccess { .. } => write!(f, "AskSuccess"),
Self::HandlerError { error } => write!(f, "HandlerError({:?})", error),
Self::StreamCompleted { items_emitted } => {
write!(f, "StreamCompleted({})", items_emitted)
}
Self::StreamCancelled { items_emitted } => {
write!(f, "StreamCancelled({})", items_emitted)
}
}
}
}
pub type HandlerWrapper<'a> = Box<
dyn FnOnce(Pin<Box<dyn Future<Output = ()> + Send + 'a>>)
-> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
+ Send
+ 'a,
>;
pub trait InboundInterceptor: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn on_receive(
&self,
ctx: &InboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &mut Headers,
message: &dyn Any,
) -> Disposition {
let _ = (ctx, runtime_headers, headers, message);
Disposition::Continue
}
fn on_complete(
&self,
ctx: &InboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &Headers,
outcome: &Outcome<'_>,
) {
let _ = (ctx, runtime_headers, headers, outcome);
}
fn on_expand_item(
&self,
ctx: &InboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> Disposition {
let _ = (ctx, headers, seq, item);
Disposition::Continue
}
fn wrap_handler<'a>(
&'a self,
ctx: &InboundContext<'_>,
headers: &Headers,
) -> Option<HandlerWrapper<'a>> {
let _ = (ctx, headers);
None
}
}
pub struct OutboundContext<'a> {
pub target_id: ActorId,
pub target_name: &'a str,
pub message_type: &'static str,
pub send_mode: SendMode,
pub remote: bool,
}
pub trait OutboundInterceptor: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn on_send(
&self,
ctx: &OutboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &mut Headers,
message: &dyn Any,
) -> Disposition {
let _ = (ctx, runtime_headers, headers, message);
Disposition::Continue
}
fn on_reply(
&self,
ctx: &OutboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &Headers,
outcome: &Outcome<'_>,
) {
let _ = (ctx, runtime_headers, headers, outcome);
}
fn on_expand_item(
&self,
ctx: &OutboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> Disposition {
let _ = (ctx, headers, seq, item);
Disposition::Continue
}
}
pub fn collect_handler_wrappers<'a>(
interceptors: &'a [Box<dyn InboundInterceptor>],
ctx: &InboundContext<'_>,
headers: &Headers,
) -> Vec<Option<HandlerWrapper<'a>>> {
interceptors
.iter()
.map(|i| i.wrap_handler(ctx, headers))
.collect()
}
pub fn apply_handler_wrappers<'a>(
wrappers: Vec<Option<HandlerWrapper<'a>>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
wrappers
.into_iter()
.rev()
.fold(inner, |next, wrapper| match wrapper {
Some(w) => w(next),
None => next,
})
}
#[cfg(test)]
mod tests {
use super::*;
struct NoopInterceptor;
impl InboundInterceptor for NoopInterceptor {
fn name(&self) -> &'static str {
"noop"
}
}
#[test]
fn test_noop_interceptor_defaults() {
let interceptor = NoopInterceptor;
assert_eq!(interceptor.name(), "noop");
let ctx = InboundContext {
actor_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
actor_name: "test",
message_type: "TestMsg",
send_mode: SendMode::Tell,
remote: false,
origin_node: None,
};
let rh = RuntimeHeaders::new();
let mut headers = Headers::new();
let msg: u64 = 42;
let disposition = interceptor.on_receive(&ctx, &rh, &mut headers, &msg);
assert!(matches!(disposition, Disposition::Continue));
}
#[test]
fn test_send_mode_variants() {
assert_eq!(SendMode::Tell, SendMode::Tell);
assert_ne!(SendMode::Tell, SendMode::Ask);
assert_ne!(SendMode::Expand, SendMode::Reduce);
}
#[test]
fn test_disposition_variants() {
let _ = Disposition::Continue;
let _ = Disposition::Delay(Duration::from_millis(100));
let _ = Disposition::Drop;
let _ = Disposition::Reject("forbidden".into());
}
#[test]
fn test_outcome_variants() {
let _ = Outcome::TellSuccess;
let val = 42u64;
let _ = Outcome::AskSuccess { reply: &val };
let _ = Outcome::HandlerError {
error: ActorError::internal("test"),
};
let _ = Outcome::StreamCompleted { items_emitted: 10 };
let _ = Outcome::StreamCancelled { items_emitted: 5 };
}
#[test]
fn test_wrap_handler_default_returns_none() {
let interceptor = NoopInterceptor;
let ctx = InboundContext {
actor_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
actor_name: "test",
message_type: "TestMsg",
send_mode: SendMode::Tell,
remote: false,
origin_node: None,
};
let headers = Headers::new();
assert!(interceptor.wrap_handler(&ctx, &headers).is_none());
}
struct WrappingInterceptor;
impl InboundInterceptor for WrappingInterceptor {
fn name(&self) -> &'static str {
"wrapping"
}
fn wrap_handler<'a>(
&'a self,
_ctx: &InboundContext<'_>,
_headers: &Headers,
) -> Option<HandlerWrapper<'a>> {
Some(Box::new(|next| {
Box::pin(async move {
next.await;
})
}))
}
}
#[test]
fn test_wrap_handler_returns_some() {
let interceptor = WrappingInterceptor;
let ctx = InboundContext {
actor_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
actor_name: "test",
message_type: "TestMsg",
send_mode: SendMode::Tell,
remote: false,
origin_node: None,
};
let headers = Headers::new();
assert!(interceptor.wrap_handler(&ctx, &headers).is_some());
}
#[test]
fn test_collect_handler_wrappers_mixed() {
let interceptors: Vec<Box<dyn InboundInterceptor>> = vec![
Box::new(NoopInterceptor),
Box::new(WrappingInterceptor),
Box::new(NoopInterceptor),
];
let ctx = InboundContext {
actor_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
actor_name: "test",
message_type: "TestMsg",
send_mode: SendMode::Tell,
remote: false,
origin_node: None,
};
let headers = Headers::new();
let wrappers = collect_handler_wrappers(&interceptors, &ctx, &headers);
assert_eq!(wrappers.len(), 3);
assert!(wrappers[0].is_none());
assert!(wrappers[1].is_some());
assert!(wrappers[2].is_none());
}
#[tokio::test]
async fn test_apply_handler_wrappers_executes_inner() {
use std::sync::atomic::{AtomicBool, Ordering};
let executed = Arc::new(AtomicBool::new(false));
let executed_clone = executed.clone();
let inner: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
executed_clone.store(true, Ordering::SeqCst);
});
let wrapped = apply_handler_wrappers(vec![], inner);
wrapped.await;
assert!(executed.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_apply_handler_wrappers_nesting_order() {
use std::sync::atomic::{AtomicU32, Ordering};
let order = Arc::new(AtomicU32::new(0));
let order1 = order.clone();
let order2 = order.clone();
let order_inner = order.clone();
let w1: HandlerWrapper<'_> = Box::new(move |next| {
Box::pin(async move {
let val = order1.fetch_add(1, Ordering::SeqCst);
assert_eq!(val, 0, "wrapper 1 should enter first");
next.await;
})
});
let w2: HandlerWrapper<'_> = Box::new(move |next| {
Box::pin(async move {
let val = order2.fetch_add(1, Ordering::SeqCst);
assert_eq!(val, 1, "wrapper 2 should enter second");
next.await;
})
});
let inner: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
let val = order_inner.fetch_add(1, Ordering::SeqCst);
assert_eq!(val, 2, "inner should execute last");
});
let wrappers = vec![Some(w1), Some(w2)];
let wrapped = apply_handler_wrappers(wrappers, inner);
wrapped.await;
assert_eq!(order.load(Ordering::SeqCst), 3);
}
}