pub struct Addr<A: Actor> { /* private fields */ }
Expand description
The address of an actor.
Implementations§
source§impl<A: Actor> Addr<A>
impl<A: Actor> Addr<A>
pub fn new(tx: AddressSender<A>) -> Addr<A>
sourcepub fn do_send<M>(&self, msg: M)
pub fn do_send<M>(&self, msg: M)
Sends a message unconditionally, ignoring any potential errors.
The message is always queued, even if the mailbox for the receiver is full. If the mailbox is closed, the message is silently dropped.
Examples found in repository?
examples/weak_addr.rs (line 29)
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
fn send_tick(&mut self, _ctx: &mut Context<Self>) {
for client in self.clients.iter() {
if let Some(client) = client.upgrade() {
client.do_send(TimePing(Instant::now()));
println!("⏰ sent ping to client {:?}", client);
} else {
println!("⏰ client can no longer be upgraded");
}
}
// gc
self.clients = self
.clients
.drain(..)
.filter(|c| c.upgrade().is_some())
.collect();
println!("⏰ service has {} clients", self.clients.len());
}
sourcepub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
Tries to send a message.
This method fails if actor’s mailbox is full or closed. This method registers the current task in the receiver’s queue.
sourcepub fn send<M>(&self, msg: M) -> Request<A, M>
pub fn send<M>(&self, msg: M) -> Request<A, M>
Sends an asynchronous message and waits for a response.
The communication channel to the actor is bounded. If the returned request future gets dropped, the message is cancelled.
Examples found in repository?
More examples
examples/weak_recipient.rs (line 89)
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
fn started(&mut self, ctx: &mut Self::Context) {
println!("🐰 starting ClientA");
TimeService::from_registry()
.send(RegisterForTime(ctx.address().downgrade().recipient()))
.into_actor(self)
.then(|_, _slf, _| fut::ready(()))
.spawn(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("🐰 stopping ClientA");
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("🐰 stopped ClientA");
}
}
impl Handler<TimePing> for ClientA {
type Result = ();
fn handle(&mut self, msg: TimePing, _ctx: &mut Self::Context) -> Self::Result {
println!("🐰 ClientA received ping: {:?}", msg.0);
}
}
#[derive(Debug, Default)]
pub struct ClientB;
impl Actor for ClientB {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("🐇 starting ClientB");
TimeService::from_registry()
.send(RegisterForTime(ctx.address().downgrade().recipient()))
.into_actor(self)
.then(|_, _slf, _| fut::ready(()))
.spawn(ctx);
}
examples/ring.rs (line 100)
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() -> io::Result<()> {
let sys = System::new();
let (n_nodes, n_rounds) = parse_args();
let now = SystemTime::now();
sys.block_on(async {
println!("Setting up {} nodes", n_nodes);
let limit = n_nodes * n_rounds;
let node = Node::create(move |ctx| {
let first_addr = ctx.address();
let mut prev_addr = Node {
id: 1,
limit,
next: first_addr.recipient(),
}
.start();
for id in 2..n_nodes {
prev_addr = Node {
id,
limit,
next: prev_addr.recipient(),
}
.start();
}
Node {
id: n_nodes,
limit,
next: prev_addr.recipient(),
}
});
println!(
"Sending start message and waiting for termination after {} messages...",
limit
);
node.send(Payload(1)).await.unwrap();
});
sys.run().unwrap();
match now.elapsed() {
Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds ({} msg/second)",
elapsed.as_secs(),
elapsed.subsec_micros(),
(n_nodes * n_rounds * 1000000) as u128 / elapsed.as_micros()
),
Err(e) => println!("An error occurred: {:?}", e),
}
Ok(())
}
sourcepub fn recipient<M>(self) -> Recipient<M>
pub fn recipient<M>(self) -> Recipient<M>
Returns the Recipient
for a specific message type.
Examples found in repository?
examples/ring.rs (line 75)
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
fn main() -> io::Result<()> {
let sys = System::new();
let (n_nodes, n_rounds) = parse_args();
let now = SystemTime::now();
sys.block_on(async {
println!("Setting up {} nodes", n_nodes);
let limit = n_nodes * n_rounds;
let node = Node::create(move |ctx| {
let first_addr = ctx.address();
let mut prev_addr = Node {
id: 1,
limit,
next: first_addr.recipient(),
}
.start();
for id in 2..n_nodes {
prev_addr = Node {
id,
limit,
next: prev_addr.recipient(),
}
.start();
}
Node {
id: n_nodes,
limit,
next: prev_addr.recipient(),
}
});
println!(
"Sending start message and waiting for termination after {} messages...",
limit
);
node.send(Payload(1)).await.unwrap();
});
sys.run().unwrap();
match now.elapsed() {
Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds ({} msg/second)",
elapsed.as_secs(),
elapsed.subsec_micros(),
(n_nodes * n_rounds * 1000000) as u128 / elapsed.as_micros()
),
Err(e) => println!("An error occurred: {:?}", e),
}
Ok(())
}
sourcepub fn downgrade(&self) -> WeakAddr<A>
pub fn downgrade(&self) -> WeakAddr<A>
Returns a downgraded WeakAddr
.
Examples found in repository?
More examples
examples/weak_recipient.rs (line 89)
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
fn started(&mut self, ctx: &mut Self::Context) {
println!("🐰 starting ClientA");
TimeService::from_registry()
.send(RegisterForTime(ctx.address().downgrade().recipient()))
.into_actor(self)
.then(|_, _slf, _| fut::ready(()))
.spawn(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("🐰 stopping ClientA");
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("🐰 stopped ClientA");
}
}
impl Handler<TimePing> for ClientA {
type Result = ();
fn handle(&mut self, msg: TimePing, _ctx: &mut Self::Context) -> Self::Result {
println!("🐰 ClientA received ping: {:?}", msg.0);
}
}
#[derive(Debug, Default)]
pub struct ClientB;
impl Actor for ClientB {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("🐇 starting ClientB");
TimeService::from_registry()
.send(RegisterForTime(ctx.address().downgrade().recipient()))
.into_actor(self)
.then(|_, _slf, _| fut::ready(()))
.spawn(ctx);
}
Trait Implementations§
source§impl<A, M: Message + Send + 'static> From<Addr<A>> for WeakRecipient<M>
impl<A, M: Message + Send + 'static> From<Addr<A>> for WeakRecipient<M>
source§fn from(addr: Addr<A>) -> WeakRecipient<M>
fn from(addr: Addr<A>) -> WeakRecipient<M>
Converts to this type from the input type.
source§impl<A, M, B> MessageResponse<A, M> for Addr<B>
impl<A, M, B> MessageResponse<A, M> for Addr<B>
source§impl<A: Actor> PartialEq for Addr<A>
impl<A: Actor> PartialEq for Addr<A>
impl<A: Actor> Eq for Addr<A>
Auto Trait Implementations§
impl<A> !RefUnwindSafe for Addr<A>
impl<A> Send for Addr<A>
impl<A> Sync for Addr<A>
impl<A> Unpin for Addr<A>
impl<A> !UnwindSafe for Addr<A>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more