Skip to main content

dactor/
actor.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9use tokio_util::sync::CancellationToken;
10
11use crate::errors::{ActorSendError, ErrorAction, ErrorCode, RuntimeError};
12use crate::interceptor::SendMode;
13use crate::mailbox::MailboxConfig;
14use crate::message::{Headers, Message};
15use crate::node::ActorId;
16use crate::stream::{BatchConfig, BoxStream, StreamReceiver, StreamSender};
17
18/// An error originating from within an actor's handler or lifecycle hook.
19///
20/// Structured error type for actor handler failures.
21/// Inspired by gRPC status codes — carries a code, message, optional details,
22/// and an error chain for causal tracing.
23#[derive(Debug, Clone)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct ActorError {
26    /// The error category (e.g., Internal, InvalidArgument, NotFound).
27    pub code: ErrorCode,
28    /// Human-readable error description.
29    pub message: String,
30    /// Optional structured details (JSON string or domain-specific data).
31    pub details: Option<String>,
32    /// Causal chain — the error that caused this one.
33    pub cause: Option<Box<ActorError>>,
34}
35
36impl ActorError {
37    /// Create a simple error with code and message.
38    pub fn new(code: ErrorCode, message: impl Into<String>) -> Self {
39        Self {
40            code,
41            message: message.into(),
42            details: None,
43            cause: None,
44        }
45    }
46
47    /// Create an internal error (most common for unexpected failures).
48    pub fn internal(message: impl Into<String>) -> Self {
49        Self::new(ErrorCode::Internal, message)
50    }
51
52    /// Add details to the error.
53    pub fn with_details(mut self, details: impl Into<String>) -> Self {
54        self.details = Some(details.into());
55        self
56    }
57
58    /// Chain a cause to this error.
59    pub fn with_cause(mut self, cause: ActorError) -> Self {
60        self.cause = Some(Box::new(cause));
61        self
62    }
63}
64
65impl fmt::Display for ActorError {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        write!(f, "[{:?}] {}", self.code, self.message)?;
68        if let Some(ref details) = self.details {
69            write!(f, " ({})", details)?;
70        }
71        if let Some(ref cause) = self.cause {
72            write!(f, " caused by: {}", cause)?;
73        }
74        Ok(())
75    }
76}
77
78impl std::error::Error for ActorError {}
79
80/// Context passed to actor lifecycle hooks and handlers.
81///
82/// **Note:** `ActorContext` is not `Clone` because `Headers` contains type-erased
83/// header values (`Box<dyn HeaderValue>`) which are not cloneable. This is an
84/// intentional v0.2 API change — handlers receive `&mut ActorContext` and should
85/// extract needed values rather than cloning the context.
86#[derive(Debug)]
87pub struct ActorContext {
88    /// Unique identity of the actor.
89    pub actor_id: ActorId,
90    /// Human-readable name of the actor.
91    pub actor_name: String,
92    /// How the current message was sent (Tell, Ask, Expand, Reduce, Transform).
93    /// `None` during on_start/on_stop (no message being processed).
94    pub send_mode: Option<SendMode>,
95    /// Headers attached to the current message.
96    /// Empty during on_start/on_stop.
97    pub headers: Headers,
98    /// Cancellation token for the current request. `None` if no cancellation was requested.
99    pub(crate) cancellation_token: Option<CancellationToken>,
100}
101
102impl ActorContext {
103    /// Create a new `ActorContext` for an actor.
104    ///
105    /// Used by runtime adapters to construct the context before calling
106    /// lifecycle hooks and handlers.
107    pub fn new(actor_id: ActorId, actor_name: String) -> Self {
108        Self {
109            actor_id,
110            actor_name,
111            send_mode: None,
112            headers: Headers::new(),
113            cancellation_token: None,
114        }
115    }
116
117    /// Returns a future that completes when the current request is cancelled.
118    /// Use in `tokio::select!` to cooperatively check for cancellation:
119    ///
120    /// ```ignore
121    /// tokio::select! {
122    ///     result = some_long_operation() => { /* use result */ }
123    ///     _ = ctx.cancelled() => { return Err(ActorError::internal("cancelled")); }
124    /// }
125    /// ```
126    ///
127    /// Returns `futures::future::pending()` if no cancellation token is set,
128    /// meaning the branch will never trigger.
129    pub async fn cancelled(&self) {
130        match &self.cancellation_token {
131            Some(token) => token.cancelled().await,
132            None => futures::future::pending().await,
133        }
134    }
135
136    /// Set the cancellation token for the current message.
137    /// Used by runtime adapters to propagate cancellation into handlers.
138    pub fn set_cancellation_token(&mut self, token: Option<CancellationToken>) {
139        self.cancellation_token = token;
140    }
141}
142
143/// The core actor trait. Implemented by the user's actor struct.
144/// State lives in `self`. Lifecycle hooks have default no-ops.
145#[async_trait]
146pub trait Actor: Send + 'static {
147    /// Serializable construction arguments for creating this actor.
148    type Args: Send + 'static;
149
150    /// Non-serializable local dependencies, resolved at the target node.
151    type Deps: Send + 'static;
152
153    /// Construct the actor from serializable args and local deps.
154    ///
155    /// Called by the runtime during spawn. Perform only synchronous
156    /// initialization here; use `on_start` for async setup.
157    fn create(args: Self::Args, deps: Self::Deps) -> Self
158    where
159        Self: Sized;
160
161    /// Called after spawn, before any messages. Default: no-op.
162    /// Use for async initialization, resource acquisition, subscriptions.
163    async fn on_start(&mut self, _ctx: &mut ActorContext) {}
164
165    /// Called when the actor is stopping. Default: no-op.
166    /// Use for cleanup, resource release, flushing buffers.
167    async fn on_stop(&mut self) {}
168
169    /// Called on handler error/panic. Returns what to do next.
170    fn on_error(&mut self, _error: &ActorError) -> ErrorAction {
171        ErrorAction::Stop
172    }
173}
174
175/// Implemented by an actor for each message type it can handle.
176/// One impl per (Actor, Message) pair. Sequential execution guaranteed.
177#[async_trait]
178pub trait Handler<M: Message>: Actor {
179    /// Handle the message and return a reply.
180    async fn handle(&mut self, msg: M, ctx: &mut ActorContext) -> M::Reply;
181}
182
183/// Implemented by actors that handle expand (server-streaming) requests.
184/// The handler receives the request and a `StreamSender` to push items into.
185/// When this method returns, the stream closes on the caller side.
186///
187/// Generic parameters:
188/// - `M` — the request message type.
189/// - `OutputItem` — the type of items pushed into the output stream.
190#[async_trait]
191pub trait ExpandHandler<M, OutputItem: Send + 'static>: Actor
192where
193    M: Send + 'static,
194{
195    /// Handle an expand request. Push items into `sender`.
196    /// When this method returns, the stream closes.
197    async fn handle_expand(
198        &mut self,
199        msg: M,
200        sender: StreamSender<OutputItem>,
201        ctx: &mut ActorContext,
202    );
203}
204
205/// Implemented by actors that handle reduce (client-streaming) requests.
206///
207/// The handler receives a [`StreamReceiver`] from which it pulls
208/// caller-provided items. When the stream ends, the handler returns a
209/// final reply.
210///
211/// Generic parameters:
212/// - `InputItem` — the type of items the caller streams to the actor.
213/// - `Reply` — the type returned after the actor consumes all items.
214#[async_trait]
215pub trait ReduceHandler<InputItem: Send + 'static, Reply: Send + 'static>: Actor {
216    /// Handle a reduce request. Pull items from `receiver` and return a reply.
217    async fn handle_reduce(
218        &mut self,
219        receiver: StreamReceiver<InputItem>,
220        ctx: &mut ActorContext,
221    ) -> Reply;
222}
223
224/// Implemented by actors that handle transform (N→M) requests.
225///
226/// Receives items from an input stream and produces items to an output stream.
227/// For each input item, the handler may push zero or more output items via the
228/// [`StreamSender`]. When the input stream ends, [`on_transform_complete`] is
229/// called to allow final items to be emitted.
230///
231/// Generic parameters:
232/// - `InputItem` — the type of items the caller streams to the actor.
233/// - `OutputItem` — the type of items the actor pushes to the output stream.
234#[async_trait]
235pub trait TransformHandler<InputItem: Send + 'static, OutputItem: Send + 'static>: Actor {
236    /// Handle one input item. Push zero or more output items via `sender`.
237    async fn handle_transform(
238        &mut self,
239        item: InputItem,
240        sender: &StreamSender<OutputItem>,
241        ctx: &mut ActorContext,
242    );
243
244    /// Called when the input stream ends. Optionally push final items.
245    async fn on_transform_complete(
246        &mut self,
247        sender: &StreamSender<OutputItem>,
248        ctx: &mut ActorContext,
249    ) {
250        let _ = (sender, ctx);
251    }
252}
253
254/// A future that resolves to the reply from an `ask()` call.
255///
256/// Wraps a `oneshot::Receiver` and implements `Future` so that callers can
257/// `.await` the reply directly: `let count = actor.ask(GetCount, None)?.await?;`
258pub struct AskReply<R> {
259    rx: oneshot::Receiver<Result<R, RuntimeError>>,
260}
261
262impl<R> AskReply<R> {
263    /// Wrap a oneshot receiver into an `AskReply` future.
264    ///
265    /// Typically called by the runtime, not by user code.
266    pub fn new(rx: oneshot::Receiver<Result<R, RuntimeError>>) -> Self {
267        Self { rx }
268    }
269}
270
271impl<R> Future for AskReply<R> {
272    type Output = Result<R, RuntimeError>;
273
274    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275        match Pin::new(&mut self.rx).poll(cx) {
276            Poll::Ready(Ok(Ok(reply))) => Poll::Ready(Ok(reply)),
277            Poll::Ready(Ok(Err(error))) => Poll::Ready(Err(error)),
278            Poll::Ready(Err(_)) => Poll::Ready(Err(RuntimeError::ActorNotFound(
279                "reply channel closed — actor may have stopped, panicked, or the request was cancelled".into(),
280            ))),
281            Poll::Pending => Poll::Pending,
282        }
283    }
284}
285
286/// A reference to a running actor of type `A`.
287///
288/// `ActorRef<A>` is typed to the actor struct. This enables
289/// sending any message type `M` where `A: Handler<M>`.
290pub trait ActorRef<A: Actor>: Clone + Send + Sync + 'static {
291    /// The actor's unique identity.
292    fn id(&self) -> ActorId;
293
294    /// The actor's name (as given to spawn).
295    fn name(&self) -> String;
296
297    /// Check if the actor is still alive.
298    fn is_alive(&self) -> bool;
299
300    /// Approximate number of messages pending in the actor's mailbox.
301    ///
302    /// This is a best-effort snapshot that may be stale immediately after
303    /// reading.  Used by [`PoolRouting::LeastLoaded`](crate::pool::PoolRouting::LeastLoaded)
304    /// to route to the least-busy worker.  Returns `0` by default;
305    /// adapters that can query their mailbox depth should override this.
306    fn pending_messages(&self) -> usize {
307        0
308    }
309
310    /// Gracefully stop the actor. Closes the mailbox and triggers on_stop.
311    fn stop(&self);
312
313    /// Fire-and-forget: deliver a message to the actor.
314    /// The message must have `Reply = ()` (no reply expected).
315    fn tell<M>(&self, msg: M) -> Result<(), ActorSendError>
316    where
317        A: Handler<M>,
318        M: Message<Reply = ()>;
319
320    /// Request-reply: send a message and await the reply.
321    ///
322    /// Returns an [`AskReply`] future that resolves to the handler's reply.
323    /// Usage: `let reply = actor.ask(msg, None)?.await?;`
324    ///
325    /// Pass a [`CancellationToken`] to cooperatively cancel the operation.
326    fn ask<M>(
327        &self,
328        msg: M,
329        cancel: Option<CancellationToken>,
330    ) -> Result<AskReply<M::Reply>, ActorSendError>
331    where
332        A: Handler<M>,
333        M: Message;
334
335    /// Request-stream: send a request and receive a stream of responses.
336    /// `buffer` controls the channel capacity (backpressure).
337    ///
338    /// Pass `batch_config` to enable batching (reduces per-item overhead
339    /// for remote actors). `None` means unbatched per-item delivery.
340    ///
341    /// Pass a [`CancellationToken`] to cooperatively cancel the stream.
342    fn expand<M, OutputItem>(
343        &self,
344        msg: M,
345        buffer: usize,
346        batch_config: Option<BatchConfig>,
347        cancel: Option<CancellationToken>,
348    ) -> Result<BoxStream<OutputItem>, ActorSendError>
349    where
350        A: ExpandHandler<M, OutputItem>,
351        M: Send + 'static,
352        OutputItem: Send + 'static;
353
354    /// Client-streaming (feed): stream items to the actor and receive a reply.
355    ///
356    /// The caller provides items via `input`. The actor consumes them via
357    /// [`StreamReceiver`] and returns a single reply when the stream ends.
358    /// `buffer` controls the internal channel capacity (backpressure).
359    ///
360    /// Pass `batch_config` to enable batching (reduces per-item overhead
361    /// for remote actors). `None` means unbatched per-item delivery.
362    ///
363    /// Pass a [`CancellationToken`] to cooperatively cancel the feed.
364    ///
365    /// Usage: `let reply = actor.reduce::<u64, u64>(input, 8, None, None)?.await?;`
366    fn reduce<InputItem, Reply>(
367        &self,
368        input: BoxStream<InputItem>,
369        buffer: usize,
370        batch_config: Option<BatchConfig>,
371        cancel: Option<CancellationToken>,
372    ) -> Result<AskReply<Reply>, ActorSendError>
373    where
374        A: ReduceHandler<InputItem, Reply>,
375        InputItem: Send + 'static,
376        Reply: Send + 'static;
377
378    /// Transform: stream items to the actor and receive a stream of outputs.
379    ///
380    /// The caller provides items via `input`. For each input item the actor's
381    /// [`TransformHandler::handle_transform`] may push zero or more output items.
382    /// When the input stream ends, [`TransformHandler::on_transform_complete`]
383    /// is called to allow final items to be emitted.
384    ///
385    /// `buffer` controls the internal channel capacity (backpressure).
386    ///
387    /// Pass `batch_config` to enable batching on the output stream.  Batching
388    /// groups output items into vectors by `max_items` or `max_delay`,
389    /// amortizing serialization and network costs for remote actors.
390    /// `None` means unbatched per-item delivery.
391    ///
392    /// Pass a [`CancellationToken`] to cooperatively cancel the transform.
393    ///
394    /// Usage:
395    /// ```ignore
396    /// let output: BoxStream<String> = actor.transform::<i32, String>(input, 8, None, None)?;
397    /// ```
398    fn transform<InputItem, OutputItem>(
399        &self,
400        input: BoxStream<InputItem>,
401        buffer: usize,
402        batch_config: Option<BatchConfig>,
403        cancel: Option<CancellationToken>,
404    ) -> Result<BoxStream<OutputItem>, ActorSendError>
405    where
406        A: TransformHandler<InputItem, OutputItem>,
407        InputItem: Send + 'static,
408        OutputItem: Send + 'static;
409}
410
411/// Create a [`CancellationToken`] that automatically cancels after the given duration.
412pub fn cancel_after(duration: Duration) -> CancellationToken {
413    let token = CancellationToken::new();
414    let child = token.clone();
415    tokio::spawn(async move {
416        tokio::time::sleep(duration).await;
417        child.cancel();
418    });
419    token
420}
421
422/// Configuration for spawning an actor.
423///
424/// Default: unbounded mailbox, local spawn. Set `target_node` to spawn
425/// on a remote node — the runtime serializes `Args` and sends a
426/// `SpawnRequest` to the target's `SpawnManager`.
427///
428/// If the target node is unreachable, `spawn_with_config()` returns
429/// `Err(RuntimeError::Send(...))`.
430#[derive(Debug, Clone, Default)]
431#[non_exhaustive]
432pub struct SpawnConfig {
433    /// Mailbox configuration (unbounded by default).
434    pub mailbox: MailboxConfig,
435    /// Target node for remote spawn. `None` = spawn on local node (default).
436    /// When set, the runtime serializes the actor's `Args` and sends a
437    /// `SpawnRequest` to the target node's `SpawnManager`.
438    pub target_node: Option<crate::node::NodeId>,
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::errors::ErrorAction;
445    use crate::message::Message;
446    use crate::node::{ActorId, NodeId};
447
448    // Test: Actor trait compiles with a simple Counter
449    struct Counter {
450        count: u64,
451    }
452
453    impl Actor for Counter {
454        type Args = Self;
455        type Deps = ();
456
457        fn create(args: Self, _deps: ()) -> Self {
458            args
459        }
460    }
461
462    // ── Message definitions ───────────────────────────
463    struct Increment(u64);
464    impl Message for Increment {
465        type Reply = ();
466    }
467
468    struct GetCount;
469    impl Message for GetCount {
470        type Reply = u64;
471    }
472
473    struct Reset;
474    impl Message for Reset {
475        type Reply = u64;
476    }
477
478    // ── Handler implementations ───────────────────────
479    #[async_trait]
480    impl Handler<Increment> for Counter {
481        async fn handle(&mut self, msg: Increment, _ctx: &mut ActorContext) {
482            self.count += msg.0;
483        }
484    }
485
486    #[async_trait]
487    impl Handler<GetCount> for Counter {
488        async fn handle(&mut self, _msg: GetCount, _ctx: &mut ActorContext) -> u64 {
489            self.count
490        }
491    }
492
493    #[async_trait]
494    impl Handler<Reset> for Counter {
495        async fn handle(&mut self, _msg: Reset, _ctx: &mut ActorContext) -> u64 {
496            let old = self.count;
497            self.count = 0;
498            old
499        }
500    }
501
502    #[test]
503    fn test_counter_actor_compiles() {
504        let counter = Counter::create(Counter { count: 0 }, ());
505        assert_eq!(counter.count, 0);
506    }
507
508    #[test]
509    fn test_actor_default_on_error_returns_stop() {
510        let mut counter = Counter { count: 0 };
511        let action = counter.on_error(&ActorError::internal("test error"));
512        assert_eq!(action, ErrorAction::Stop);
513    }
514
515    // Test: Actor with custom Args and Deps
516    struct WorkerArgs {
517        name: String,
518    }
519    struct WorkerDeps {
520        multiplier: u64,
521    }
522    struct Worker {
523        name: String,
524        multiplier: u64,
525    }
526
527    impl Actor for Worker {
528        type Args = WorkerArgs;
529        type Deps = WorkerDeps;
530
531        fn create(args: WorkerArgs, deps: WorkerDeps) -> Self {
532            Worker {
533                name: args.name,
534                multiplier: deps.multiplier,
535            }
536        }
537    }
538
539    #[test]
540    fn test_worker_actor_with_deps() {
541        let worker = Worker::create(
542            WorkerArgs { name: "w1".into() },
543            WorkerDeps { multiplier: 10 },
544        );
545        assert_eq!(worker.name, "w1");
546        assert_eq!(worker.multiplier, 10);
547    }
548
549    // Test: ActorId
550    #[test]
551    fn test_actor_id_display() {
552        let id = ActorId {
553            node: NodeId("node-1".into()),
554            local: 42,
555        };
556        assert_eq!(format!("{}", id), "Actor(node-1/42)");
557    }
558
559    #[test]
560    fn test_actor_id_equality() {
561        let id1 = ActorId {
562            node: NodeId("n1".into()),
563            local: 1,
564        };
565        let id2 = ActorId {
566            node: NodeId("n1".into()),
567            local: 1,
568        };
569        let id3 = ActorId {
570            node: NodeId("n1".into()),
571            local: 2,
572        };
573        assert_eq!(id1, id2);
574        assert_ne!(id1, id3);
575    }
576
577    #[test]
578    fn test_actor_id_clone() {
579        let id = ActorId {
580            node: NodeId("n1".into()),
581            local: 1,
582        };
583        let cloned = id.clone();
584        assert_eq!(id, cloned);
585    }
586
587    // Test: ErrorAction variants
588    #[test]
589    fn test_error_action_variants() {
590        assert_eq!(ErrorAction::Resume, ErrorAction::Resume);
591        assert_eq!(ErrorAction::Restart, ErrorAction::Restart);
592        assert_eq!(ErrorAction::Stop, ErrorAction::Stop);
593        assert_eq!(ErrorAction::Escalate, ErrorAction::Escalate);
594        assert_ne!(ErrorAction::Resume, ErrorAction::Stop);
595    }
596
597    // Test: SpawnConfig default
598    #[test]
599    fn test_spawn_config_default() {
600        let config = SpawnConfig::default();
601        assert!(config.target_node.is_none());
602    }
603
604    #[test]
605    fn test_spawn_config_with_target_node() {
606        let config = SpawnConfig {
607            target_node: Some(NodeId("node-3".into())),
608            ..Default::default()
609        };
610        assert_eq!(config.target_node.unwrap().0, "node-3");
611    }
612
613    // Test: ActorContext
614    #[test]
615    fn test_actor_context_fields() {
616        let ctx = ActorContext {
617            actor_id: ActorId {
618                node: NodeId("n1".into()),
619                local: 1,
620            },
621            actor_name: "test-actor".into(),
622            send_mode: None,
623            headers: Headers::new(),
624            cancellation_token: None,
625        };
626        assert_eq!(ctx.actor_name, "test-actor");
627        assert_eq!(ctx.actor_id.local, 1);
628    }
629
630    // Test: on_start / on_stop defaults are no-ops
631    #[tokio::test]
632    async fn test_lifecycle_defaults_are_noop() {
633        let mut counter = Counter { count: 42 };
634        let mut ctx = ActorContext {
635            actor_id: ActorId {
636                node: NodeId("n1".into()),
637                local: 1,
638            },
639            actor_name: "counter".into(),
640            send_mode: None,
641            headers: Headers::new(),
642            cancellation_token: None,
643        };
644        counter.on_start(&mut ctx).await;
645        counter.on_stop().await;
646        assert_eq!(counter.count, 42);
647    }
648
649    // ── Handler tests ─────────────────────────────────
650
651    #[tokio::test]
652    async fn test_handler_increment() {
653        let mut counter = Counter { count: 0 };
654        let mut ctx = ActorContext {
655            actor_id: ActorId {
656                node: NodeId("n1".into()),
657                local: 1,
658            },
659            actor_name: "counter".into(),
660            send_mode: None,
661            headers: Headers::new(),
662            cancellation_token: None,
663        };
664        counter.handle(Increment(5), &mut ctx).await;
665        assert_eq!(counter.count, 5);
666        counter.handle(Increment(3), &mut ctx).await;
667        assert_eq!(counter.count, 8);
668    }
669
670    #[tokio::test]
671    async fn test_handler_get_count() {
672        let mut counter = Counter { count: 42 };
673        let mut ctx = ActorContext {
674            actor_id: ActorId {
675                node: NodeId("n1".into()),
676                local: 1,
677            },
678            actor_name: "counter".into(),
679            send_mode: None,
680            headers: Headers::new(),
681            cancellation_token: None,
682        };
683        let count = counter.handle(GetCount, &mut ctx).await;
684        assert_eq!(count, 42);
685    }
686
687    #[tokio::test]
688    async fn test_handler_reset() {
689        let mut counter = Counter { count: 100 };
690        let mut ctx = ActorContext {
691            actor_id: ActorId {
692                node: NodeId("n1".into()),
693                local: 1,
694            },
695            actor_name: "counter".into(),
696            send_mode: None,
697            headers: Headers::new(),
698            cancellation_token: None,
699        };
700        let old = counter.handle(Reset, &mut ctx).await;
701        assert_eq!(old, 100);
702        assert_eq!(counter.count, 0);
703    }
704
705    #[tokio::test]
706    async fn test_multiple_handlers_on_same_actor() {
707        let mut counter = Counter { count: 0 };
708        let mut ctx = ActorContext {
709            actor_id: ActorId {
710                node: NodeId("n1".into()),
711                local: 1,
712            },
713            actor_name: "counter".into(),
714            send_mode: None,
715            headers: Headers::new(),
716            cancellation_token: None,
717        };
718
719        counter.handle(Increment(10), &mut ctx).await;
720        counter.handle(Increment(20), &mut ctx).await;
721
722        let count = counter.handle(GetCount, &mut ctx).await;
723        assert_eq!(count, 30);
724
725        let old = counter.handle(Reset, &mut ctx).await;
726        assert_eq!(old, 30);
727        assert_eq!(counter.count, 0);
728    }
729
730    #[test]
731    fn test_handler_requires_actor_bound() {
732        fn assert_handler<A: Handler<M>, M: Message>() {}
733        assert_handler::<Counter, Increment>();
734        assert_handler::<Counter, GetCount>();
735        assert_handler::<Counter, Reset>();
736    }
737
738    #[test]
739    fn test_actor_error_construction() {
740        let err = ActorError::new(ErrorCode::InvalidArgument, "bad input");
741        assert_eq!(err.code, ErrorCode::InvalidArgument);
742        assert_eq!(err.message, "bad input");
743        assert!(err.details.is_none());
744        assert!(err.cause.is_none());
745    }
746
747    #[test]
748    fn test_actor_error_with_details() {
749        let err = ActorError::new(ErrorCode::NotFound, "user not found").with_details("user_id=42");
750        assert_eq!(err.details.as_deref(), Some("user_id=42"));
751    }
752
753    #[test]
754    fn test_actor_error_chain() {
755        let root = ActorError::new(ErrorCode::Unavailable, "db connection failed");
756        let err = ActorError::new(ErrorCode::Internal, "query failed").with_cause(root);
757        assert!(err.cause.is_some());
758        assert_eq!(err.cause.as_ref().unwrap().code, ErrorCode::Unavailable);
759    }
760
761    #[test]
762    fn test_actor_error_display() {
763        let err = ActorError::new(ErrorCode::Internal, "something broke")
764            .with_details("stack: foo.rs:42");
765        let display = format!("{}", err);
766        assert!(display.contains("Internal"));
767        assert!(display.contains("something broke"));
768        assert!(display.contains("stack: foo.rs:42"));
769    }
770
771    #[test]
772    fn test_actor_error_display_with_chain() {
773        let root = ActorError::new(ErrorCode::Unavailable, "db down");
774        let err = ActorError::new(ErrorCode::Internal, "query failed").with_cause(root);
775        let display = format!("{}", err);
776        assert!(display.contains("caused by"));
777        assert!(display.contains("db down"));
778    }
779
780    #[test]
781    fn test_error_code_variants() {
782        let codes = vec![
783            ErrorCode::Internal,
784            ErrorCode::InvalidArgument,
785            ErrorCode::NotFound,
786            ErrorCode::Unavailable,
787            ErrorCode::Timeout,
788            ErrorCode::PermissionDenied,
789            ErrorCode::FailedPrecondition,
790            ErrorCode::ResourceExhausted,
791            ErrorCode::Unimplemented,
792            ErrorCode::Unknown,
793            ErrorCode::Cancelled,
794        ];
795        assert_eq!(codes.len(), 11);
796        // All distinct
797        for (i, a) in codes.iter().enumerate() {
798            for (j, b) in codes.iter().enumerate() {
799                if i != j {
800                    assert_ne!(a, b);
801                }
802            }
803        }
804    }
805
806    #[test]
807    fn test_actor_error_internal_helper() {
808        let err = ActorError::internal("oops");
809        assert_eq!(err.code, ErrorCode::Internal);
810        assert_eq!(err.message, "oops");
811    }
812
813    #[test]
814    fn test_not_supported_error() {
815        use crate::errors::NotSupportedError;
816        let err = NotSupportedError {
817            capability: "BoundedMailbox".into(),
818            message: "ractor does not support bounded mailboxes".into(),
819        };
820        assert!(format!("{}", err).contains("BoundedMailbox"));
821    }
822
823    #[test]
824    fn test_runtime_error_not_supported() {
825        use crate::errors::NotSupportedError;
826        let err = RuntimeError::NotSupported(NotSupportedError {
827            capability: "PriorityMailbox".into(),
828            message: "not available".into(),
829        });
830        assert!(format!("{}", err).contains("PriorityMailbox"));
831    }
832}