Skip to main content

palladium_actor/
stable.rs

1use crate::errors::{AskError, SendError};
2use crate::handle::Addr;
3use crate::message::Message;
4use crate::path::AddrHash;
5use tokio::sync::watch;
6
7/// Implemented by types that are notified when the underlying actor is
8/// restarted with a new [`AddrHash`].
9///
10/// The supervisor calls [`refresh`] once after each spawn (initial and
11/// restart). Implementations update an indirect address reference so that
12/// holders of a [`StableAddr`] transparently route to the new incarnation.
13///
14/// Implementations must be `Send + Sync` because the supervisor may run on a
15/// different thread from the handle holders.
16pub trait AddrRefresher: Send + Sync {
17    fn refresh(&self, new_hash: AddrHash);
18}
19
20/// A typed capability handle that remains valid across supervisor restarts.
21///
22/// Internally wraps a [`tokio::sync::watch`] channel whose value is updated
23/// by the supervisor via an [`AddrRefresher`] after each actor (re)spawn.
24/// Callers holding a `StableAddr` always route to the current incarnation
25/// without needing to obtain a new handle.
26///
27/// # Liveness window
28///
29/// There is a brief window between actor death and the supervisor's first
30/// `refresh()` call during which `send()` and `ask()` return
31/// `Err(SendError::Unroutable)`. Callers should treat these as transient.
32pub struct StableAddr<M: Message> {
33    rx: watch::Receiver<Option<Addr<M>>>,
34}
35
36impl<M: Message> Clone for StableAddr<M> {
37    fn clone(&self) -> Self {
38        Self {
39            rx: self.rx.clone(),
40        }
41    }
42}
43
44impl<M: Message> StableAddr<M> {
45    /// Create a `StableAddr` from a raw watch receiver.
46    ///
47    /// Called by `make_stable_addr` in `pd-runtime`; not intended for direct
48    /// use in application code.
49    pub fn from_receiver(rx: watch::Receiver<Option<Addr<M>>>) -> Self {
50        Self { rx }
51    }
52
53    /// Wrap a plain [`Addr<M>`] in a `StableAddr` that is never refreshed.
54    ///
55    /// The wrapped address is permanently bound to the given incarnation.
56    /// Useful when passing a plain address to an API that expects `StableAddr`.
57    pub fn from_addr(addr: Addr<M>) -> Self {
58        // Dropping the sender leaves the channel closed but the last value
59        // (Some(addr)) remains accessible via borrow(), so sends succeed.
60        let (_tx, rx) = watch::channel(Some(addr));
61        Self { rx }
62    }
63
64    /// Non-blocking fire-and-forget send to the current incarnation.
65    pub fn send(&self, msg: M) -> Result<(), SendError> {
66        match &*self.rx.borrow() {
67            Some(addr) => addr.send(msg),
68            None => Err(SendError::Unroutable),
69        }
70    }
71
72    /// Request/response send to the current incarnation.
73    pub async fn ask(&self, msg: M) -> Result<M::Response, AskError> {
74        // Clone to release the borrow before awaiting.
75        let addr = self.rx.borrow().clone();
76        match addr {
77            Some(addr) => addr.ask(msg).await,
78            None => Err(AskError::Send(SendError::Unroutable)),
79        }
80    }
81
82    /// Snapshot the current underlying [`Addr<M>`], if any.
83    pub fn current(&self) -> Option<Addr<M>> {
84        self.rx.borrow().clone()
85    }
86}