Skip to main content

Context

Struct Context 

Source
pub struct Context<A> { /* private fields */ }
Expand description

An actor execution context.

Implementations§

Source§

impl<A> Context<A>

Source

pub fn address(&self) -> Addr<A>

Returns the address of the actor.

Examples found in repository?
examples/subscriber.rs (line 36)
34    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
35        println!("Subscriber Parent Started");
36        let _ = ctx.address().send(InitializeChildSubscribers);
37        Ok(())
38    }
39}
40
41#[message]
42struct InitializeChildSubscribers;
43
44#[async_trait::async_trait]
45impl Handler<InitializeChildSubscribers> for SubscriberParent {
46    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: InitializeChildSubscribers) {
47        let message_producer_addr = self.message_producer.clone();
48        let dummy_ids: Vec<i32> = vec![1, 2, 3, 4, 5];
49        let children_unstarted_actors_vec = dummy_ids.into_iter().map(move |id| {
50            let id = id.clone();
51            let addr = message_producer_addr.clone();
52
53            Subscriber::new(id, addr)
54        });
55
56        let children_addr_vec = children_unstarted_actors_vec
57            .into_iter()
58            .map(|actor| async { actor.start().await.unwrap() });
59
60        let children_addr_vec = futures::future::join_all(children_addr_vec).await;
61
62        self.children_subscribers = children_addr_vec;
63    }
64}
65
66#[message]
67struct ClearChildSubscribers;
68
69#[async_trait::async_trait]
70impl Handler<ClearChildSubscribers> for SubscriberParent {
71    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ClearChildSubscribers) {
72        self.children_subscribers = Vec::new();
73    }
74}
75
76// (Child) Subscriber - B
77
78struct Subscriber {
79    id: i32,
80    message_producer_addr: Addr<MessageProducer>,
81}
82
83impl Subscriber {
84    fn new(id: i32, message_producer_addr: Addr<MessageProducer>) -> Subscriber {
85        Subscriber {
86            id,
87            message_producer_addr,
88        }
89    }
90}
91
92#[async_trait::async_trait]
93impl Actor for Subscriber {
94    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
95        // Send subscription request message to the Message Producer
96        println!("Child Subscriber Started - id {:?}", self.id);
97        let self_sender = ctx.address().sender();
98        let _ = self.message_producer_addr.send(SubscribeToProducer {
99            sender: self_sender,
100        });
101        Ok(())
102    }
Source

pub fn actor_id(&self) -> ActorId

Returns the id of the actor.

Examples found in repository?
examples/supervisor_clear_interval.rs (line 41)
37    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Ping) {
38        let now = Instant::now();
39        let delta = (now - self.last_ping).as_millis();
40        self.last_ping = now;
41        println!("PingTimer:: Ping {} {:?}", ctx.actor_id(), delta);
42    }
Source

pub fn stop(&self, err: Option<Error>)

Stop the actor.

Examples found in repository?
examples/main.rs (line 22)
20    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Die) {
21        // Stop the actor without error
22        ctx.stop(None);
23    }
More examples
Hide additional examples
examples/supervisor_clear_interval.rs (line 51)
49    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Halt) {
50        println!("PingTimer:: received Halt");
51        ctx.stop(None);
52        println!("PingTimer:: stopped");
53    }
examples/supervisor_clear_send_later.rs (line 37)
35    async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Halt) {
36        println!("PingLater:: received Halt");
37        ctx.stop(None);
38        println!("PingLater:: stopped");
39    }
Source

pub fn abort_intervals(&mut self)

Source

pub fn abort_streams(&mut self)

Source

pub fn add_stream<S>(&mut self, stream: S)
where S: Stream + Unpin + Send + 'static, S::Item: 'static + Send, A: StreamHandler<S::Item>,

Create a stream handler for the actor.

§Examples
use xactor::*;
use futures::stream;
use std::time::Duration;

#[message(result = "i32")]
struct GetSum;

#[derive(Default)]
struct MyActor(i32);

#[async_trait::async_trait]
impl StreamHandler<i32> for MyActor {
    async fn handle(&mut self, _ctx: &mut Context<Self>, msg: i32) {
        self.0 += msg;
    }

    async fn started(&mut self, _ctx: &mut Context<Self>) {
        println!("stream started");
    }

    async fn finished(&mut self, _ctx: &mut Context<Self>) {
        println!("stream finished");
    }
}

#[async_trait::async_trait]
impl Handler<GetSum> for MyActor {
    async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: GetSum) -> i32 {
        self.0
    }
}

#[async_trait::async_trait]
impl Actor for MyActor {
    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
        let values = (0..100).collect::<Vec<_>>();
        ctx.add_stream(stream::iter(values));
        Ok(())
    }
}

#[xactor::main]
async fn main() -> Result<()> {
    let mut addr = MyActor::start_default().await?;
    sleep(Duration::from_secs(1)).await; // Wait for the stream to complete
    let res = addr.call(GetSum).await?;
    assert_eq!(res, (0..100).sum::<i32>());
    Ok(())
}
Source

pub fn send_later<T>(&mut self, msg: T, after: Duration)
where A: Handler<T>, T: Message<Result = ()>,

Sends the message msg to self after a specified period of time.

We use Sender instead of Addr so that the interval doesn’t keep reference to address and prevent the actor from being dropped and stopped

Examples found in repository?
examples/supervisor_clear_send_later.rs (line 10)
9    async fn started(&mut self, ctx: &mut Context<Self>) -> xactor::Result<()> {
10        ctx.send_later(Ping("after halt"), Duration::from_millis(1_500));
11
12        Ok(())
13    }
More examples
Hide additional examples
examples/main.rs (line 13)
11    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
12        // Send the Die message 3 seconds later
13        ctx.send_later(Die, Duration::from_secs(3));
14        Ok(())
15    }
Source

pub fn send_interval_with<T, F>(&mut self, f: F, dur: Duration)
where A: Handler<T>, F: Fn() -> T + Sync + Send + 'static, T: Message<Result = ()>,

Sends the message to self, at a specified fixed interval. The message is created each time using a closure f.

Source

pub fn send_interval<T>(&mut self, msg: T, dur: Duration)
where A: Handler<T>, T: Message<Result = ()> + Clone + Sync,

Sends the message msg to self, at a specified fixed interval.

Examples found in repository?
examples/supervisor_clear_interval.rs (line 21)
19    async fn started(&mut self, ctx: &mut Context<Self>) -> xactor::Result<()> {
20        println!("PingTimer:: started()");
21        ctx.send_interval(Ping, Duration::from_millis(1000));
22        Ok(())
23    }
More examples
Hide additional examples
examples/subscriber.rs (line 135)
132    async fn started(&mut self, ctx: &mut Context<Self>) -> Result<()> {
133        // Send broadcast message to self every 2 seconds
134        println!("Message Producer Started");
135        ctx.send_interval(Broadcast, Duration::from_secs(2));
136        Ok(())
137    }
Source

pub async fn subscribe<T: Message<Result = ()>>(&self) -> Result<()>
where A: Handler<T>,

Subscribes to a message of a specified type.

Source

pub async fn unsubscribe<T: Message<Result = ()>>(&self) -> Result<()>

Unsubscribe to a message of a specified type.

Auto Trait Implementations§

§

impl<A> Freeze for Context<A>

§

impl<A> !RefUnwindSafe for Context<A>

§

impl<A> Send for Context<A>

§

impl<A> Sync for Context<A>

§

impl<A> Unpin for Context<A>

§

impl<A> UnsafeUnpin for Context<A>

§

impl<A> !UnwindSafe for Context<A>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.