use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use crate::actor::ActorError;
use crate::message::{Headers, RuntimeHeaders};
use crate::node::{ActorId, NodeId};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum SendMode {
Tell,
Ask,
Expand,
Reduce,
Transform,
}
pub struct InboundContext<'a> {
pub actor_id: ActorId,
pub actor_name: &'a str,
pub message_type: &'static str,
pub send_mode: SendMode,
pub remote: bool,
pub origin_node: Option<NodeId>,
}
#[derive(Debug)]
pub enum Disposition {
Continue,
Delay(Duration),
Drop,
Reject(String),
Retry(Duration),
}
pub struct InterceptResult {
pub disposition: Disposition,
pub interceptor_name: &'static str,
}
impl InterceptResult {
pub fn continued() -> Self {
Self {
disposition: Disposition::Continue,
interceptor_name: "",
}
}
pub fn is_continue(&self) -> bool {
matches!(self.disposition, Disposition::Continue)
}
}
#[derive(Debug)]
pub struct DropNotice {
pub target_name: String,
pub message_type: &'static str,
pub interceptor_name: &'static str,
pub send_mode: SendMode,
pub context: &'static str,
pub seq: Option<u64>,
}
pub trait DropObserver: Send + Sync + 'static {
fn on_drop(&self, notice: DropNotice);
}
pub fn notify_drop(observer: &Option<Arc<dyn DropObserver>>, notice: DropNotice) {
if let Some(obs) = observer {
obs.on_drop(notice);
}
}
pub fn intercept_outbound_stream_item(
interceptors: &[Box<dyn OutboundInterceptor>],
ctx: &OutboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> InterceptResult {
for interceptor in interceptors {
let d = interceptor.on_expand_item(ctx, headers, seq, item);
if !matches!(d, Disposition::Continue) {
return InterceptResult {
disposition: d,
interceptor_name: interceptor.name(),
};
}
}
InterceptResult::continued()
}
pub enum Outcome<'a> {
TellSuccess,
AskSuccess {
reply: &'a dyn Any,
},
HandlerError {
error: ActorError,
},
StreamCompleted {
items_emitted: u64,
},
StreamCancelled {
items_emitted: u64,
},
}
impl<'a> std::fmt::Debug for Outcome<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TellSuccess => write!(f, "TellSuccess"),
Self::AskSuccess { .. } => write!(f, "AskSuccess"),
Self::HandlerError { error } => write!(f, "HandlerError({:?})", error),
Self::StreamCompleted { items_emitted } => {
write!(f, "StreamCompleted({})", items_emitted)
}
Self::StreamCancelled { items_emitted } => {
write!(f, "StreamCancelled({})", items_emitted)
}
}
}
}
pub trait InboundInterceptor: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn on_receive(
&self,
ctx: &InboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &mut Headers,
message: &dyn Any,
) -> Disposition {
let _ = (ctx, runtime_headers, headers, message);
Disposition::Continue
}
fn on_complete(
&self,
ctx: &InboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &Headers,
outcome: &Outcome<'_>,
) {
let _ = (ctx, runtime_headers, headers, outcome);
}
fn on_expand_item(
&self,
ctx: &InboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> Disposition {
let _ = (ctx, headers, seq, item);
Disposition::Continue
}
}
pub struct OutboundContext<'a> {
pub target_id: ActorId,
pub target_name: &'a str,
pub message_type: &'static str,
pub send_mode: SendMode,
pub remote: bool,
}
pub trait OutboundInterceptor: Send + Sync + 'static {
fn name(&self) -> &'static str;
fn on_send(
&self,
ctx: &OutboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &mut Headers,
message: &dyn Any,
) -> Disposition {
let _ = (ctx, runtime_headers, headers, message);
Disposition::Continue
}
fn on_reply(
&self,
ctx: &OutboundContext<'_>,
runtime_headers: &RuntimeHeaders,
headers: &Headers,
outcome: &Outcome<'_>,
) {
let _ = (ctx, runtime_headers, headers, outcome);
}
fn on_expand_item(
&self,
ctx: &OutboundContext<'_>,
headers: &Headers,
seq: u64,
item: &dyn Any,
) -> Disposition {
let _ = (ctx, headers, seq, item);
Disposition::Continue
}
}
#[cfg(test)]
mod tests {
use super::*;
struct NoopInterceptor;
impl InboundInterceptor for NoopInterceptor {
fn name(&self) -> &'static str {
"noop"
}
}
#[test]
fn test_noop_interceptor_defaults() {
let interceptor = NoopInterceptor;
assert_eq!(interceptor.name(), "noop");
let ctx = InboundContext {
actor_id: ActorId {
node: NodeId("n1".into()),
local: 1,
},
actor_name: "test",
message_type: "TestMsg",
send_mode: SendMode::Tell,
remote: false,
origin_node: None,
};
let rh = RuntimeHeaders::new();
let mut headers = Headers::new();
let msg: u64 = 42;
let disposition = interceptor.on_receive(&ctx, &rh, &mut headers, &msg);
assert!(matches!(disposition, Disposition::Continue));
}
#[test]
fn test_send_mode_variants() {
assert_eq!(SendMode::Tell, SendMode::Tell);
assert_ne!(SendMode::Tell, SendMode::Ask);
assert_ne!(SendMode::Expand, SendMode::Reduce);
}
#[test]
fn test_disposition_variants() {
let _ = Disposition::Continue;
let _ = Disposition::Delay(Duration::from_millis(100));
let _ = Disposition::Drop;
let _ = Disposition::Reject("forbidden".into());
}
#[test]
fn test_outcome_variants() {
let _ = Outcome::TellSuccess;
let val = 42u64;
let _ = Outcome::AskSuccess { reply: &val };
let _ = Outcome::HandlerError {
error: ActorError::internal("test"),
};
let _ = Outcome::StreamCompleted { items_emitted: 10 };
let _ = Outcome::StreamCancelled { items_emitted: 5 };
}
}