palladium_runtime/
stable_addr.rs1use crate::addr::{make_ask_fn, make_send_fn};
2use crate::reactor::Reactor;
3use crate::responses::ResponseRegistry;
4use palladium_actor::{Addr, AddrHash, AddrRefresher, Message, StableAddr};
5use palladium_transport::{InProcessTransport, TransportRegistry};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::watch;
9
10#[allow(dead_code)]
12struct StableAddrRefresherImpl<M: Message> {
13 tx: watch::Sender<Option<Addr<M>>>,
14 make_addr: Arc<dyn Fn(AddrHash) -> Addr<M> + Send + Sync>,
15}
16
17impl<M: Message> AddrRefresher for StableAddrRefresherImpl<M> {
18 fn refresh(&self, new_hash: AddrHash) {
19 let addr = (self.make_addr)(new_hash);
20 let _ = self.tx.send(Some(addr));
21 }
22}
23
24#[allow(dead_code)]
49pub(crate) fn make_stable_addr<M, R>(
50 source: AddrHash,
51 transport: Arc<InProcessTransport>,
52 transport_registry: Arc<TransportRegistry>,
53 responses: Arc<ResponseRegistry>,
54 ask_timeout: Duration,
55 reactor: R,
56) -> (StableAddr<M>, Arc<dyn AddrRefresher>)
57where
58 M: Message,
59 R: Reactor + Clone + Send + Sync + 'static,
60{
61 let transport_for_ask = transport.clone();
62 let tr = transport_registry.clone();
63 let res = responses.clone();
64 let r = reactor.clone();
65
66 let make_addr: Arc<dyn Fn(AddrHash) -> Addr<M> + Send + Sync> = Arc::new(move |target| {
67 let send_fn = make_send_fn::<M>(target, source, tr.clone());
68 let ask_fn = make_ask_fn::<M, R>(
69 target,
70 source,
71 transport_for_ask.clone(),
72 tr.clone(),
73 res.clone(),
74 ask_timeout,
75 r.clone(),
76 );
77 Addr::with_handlers(target, send_fn, ask_fn)
78 });
79
80 let (tx, rx) = watch::channel(None);
81 let refresher = Arc::new(StableAddrRefresherImpl { tx, make_addr });
82 let stable = StableAddr::from_receiver(rx);
83 (stable, refresher)
84}