palladium_actor/stable.rs
1use crate::errors::{AskError, SendError};
2use crate::handle::Addr;
3use crate::message::Message;
4use crate::path::AddrHash;
5use tokio::sync::watch;
6
7/// Implemented by types that are notified when the underlying actor is
8/// restarted with a new [`AddrHash`].
9///
10/// The supervisor calls [`refresh`] once after each spawn (initial and
11/// restart). Implementations update an indirect address reference so that
12/// holders of a [`StableAddr`] transparently route to the new incarnation.
13///
14/// Implementations must be `Send + Sync` because the supervisor may run on a
15/// different thread from the handle holders.
16pub trait AddrRefresher: Send + Sync {
17 fn refresh(&self, new_hash: AddrHash);
18}
19
20/// A typed capability handle that remains valid across supervisor restarts.
21///
22/// Internally wraps a [`tokio::sync::watch`] channel whose value is updated
23/// by the supervisor via an [`AddrRefresher`] after each actor (re)spawn.
24/// Callers holding a `StableAddr` always route to the current incarnation
25/// without needing to obtain a new handle.
26///
27/// # Liveness window
28///
29/// There is a brief window between actor death and the supervisor's first
30/// `refresh()` call during which `send()` and `ask()` return
31/// `Err(SendError::Unroutable)`. Callers should treat these as transient.
32pub struct StableAddr<M: Message> {
33 rx: watch::Receiver<Option<Addr<M>>>,
34}
35
36impl<M: Message> Clone for StableAddr<M> {
37 fn clone(&self) -> Self {
38 Self {
39 rx: self.rx.clone(),
40 }
41 }
42}
43
44impl<M: Message> StableAddr<M> {
45 /// Create a `StableAddr` from a raw watch receiver.
46 ///
47 /// Called by `make_stable_addr` in `pd-runtime`; not intended for direct
48 /// use in application code.
49 pub fn from_receiver(rx: watch::Receiver<Option<Addr<M>>>) -> Self {
50 Self { rx }
51 }
52
53 /// Wrap a plain [`Addr<M>`] in a `StableAddr` that is never refreshed.
54 ///
55 /// The wrapped address is permanently bound to the given incarnation.
56 /// Useful when passing a plain address to an API that expects `StableAddr`.
57 pub fn from_addr(addr: Addr<M>) -> Self {
58 // Dropping the sender leaves the channel closed but the last value
59 // (Some(addr)) remains accessible via borrow(), so sends succeed.
60 let (_tx, rx) = watch::channel(Some(addr));
61 Self { rx }
62 }
63
64 /// Non-blocking fire-and-forget send to the current incarnation.
65 pub fn send(&self, msg: M) -> Result<(), SendError> {
66 match &*self.rx.borrow() {
67 Some(addr) => addr.send(msg),
68 None => Err(SendError::Unroutable),
69 }
70 }
71
72 /// Request/response send to the current incarnation.
73 pub async fn ask(&self, msg: M) -> Result<M::Response, AskError> {
74 // Clone to release the borrow before awaiting.
75 let addr = self.rx.borrow().clone();
76 match addr {
77 Some(addr) => addr.ask(msg).await,
78 None => Err(AskError::Send(SendError::Unroutable)),
79 }
80 }
81
82 /// Snapshot the current underlying [`Addr<M>`], if any.
83 pub fn current(&self) -> Option<Addr<M>> {
84 self.rx.borrow().clone()
85 }
86}