Skip to main content

palladium_runtime/
stable_addr.rs

1use crate::addr::{make_ask_fn, make_send_fn};
2use crate::reactor::Reactor;
3use crate::responses::ResponseRegistry;
4use palladium_actor::{Addr, AddrHash, AddrRefresher, Message, StableAddr};
5use palladium_transport::{InProcessTransport, TransportRegistry};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::watch;
9
10// Used by make_stable_addr; callers exist in fabric code and tests.
11#[allow(dead_code)]
12struct StableAddrRefresherImpl<M: Message> {
13    tx: watch::Sender<Option<Addr<M>>>,
14    make_addr: Arc<dyn Fn(AddrHash) -> Addr<M> + Send + Sync>,
15}
16
17impl<M: Message> AddrRefresher for StableAddrRefresherImpl<M> {
18    fn refresh(&self, new_hash: AddrHash) {
19        let addr = (self.make_addr)(new_hash);
20        let _ = self.tx.send(Some(addr));
21    }
22}
23
24/// Create a [`StableAddr<M>`] and its paired [`AddrRefresher`].
25///
26/// Pass the `Arc<dyn AddrRefresher>` to [`ChildSpec::with_stable_addr`] before
27/// spawning the child. The supervisor calls `refresh` after each (re)spawn,
28/// keeping the `StableAddr` current without any action by the caller.
29///
30/// `source` is the `AddrHash` of the logical sender (used as the reply address
31/// for `ask`). Use `AddrHash::synthetic(b"…")` for infrastructure or test
32/// senders that have no actor path.
33///
34/// # Example
35///
36/// ```ignore
37/// let (stable, refresher) = make_stable_addr::<MyMsg, TokioReactor>(
38///     source_hash,
39///     transport.clone(),
40///     transport_registry.clone(),
41///     responses.clone(),
42///     Duration::from_secs(5),
43///     TokioReactor,
44/// );
45/// let spec = ChildSpec::new("worker", RestartPolicy::Permanent, ...)
46///     .with_stable_addr(refresher);
47/// ```
48#[allow(dead_code)]
49pub(crate) fn make_stable_addr<M, R>(
50    source: AddrHash,
51    transport: Arc<InProcessTransport>,
52    transport_registry: Arc<TransportRegistry>,
53    responses: Arc<ResponseRegistry>,
54    ask_timeout: Duration,
55    reactor: R,
56) -> (StableAddr<M>, Arc<dyn AddrRefresher>)
57where
58    M: Message,
59    R: Reactor + Clone + Send + Sync + 'static,
60{
61    let transport_for_ask = transport.clone();
62    let tr = transport_registry.clone();
63    let res = responses.clone();
64    let r = reactor.clone();
65
66    let make_addr: Arc<dyn Fn(AddrHash) -> Addr<M> + Send + Sync> = Arc::new(move |target| {
67        let send_fn = make_send_fn::<M>(target, source, tr.clone());
68        let ask_fn = make_ask_fn::<M, R>(
69            target,
70            source,
71            transport_for_ask.clone(),
72            tr.clone(),
73            res.clone(),
74            ask_timeout,
75            r.clone(),
76        );
77        Addr::with_handlers(target, send_fn, ask_fn)
78    });
79
80    let (tx, rx) = watch::channel(None);
81    let refresher = Arc::new(StableAddrRefresherImpl { tx, make_addr });
82    let stable = StableAddr::from_receiver(rx);
83    (stable, refresher)
84}