Struct crossbus::context::Context

source ·
pub struct Context<A: Actor> { /* private fields */ }
Expand description

execution state and local environment for actor during runtime

Implementations§

source§

impl<A: Actor> Context<A>

source

pub fn new() -> Self

create an instance of Context

struct CrossBus {}
impl Actor for CrossBus {
...
}

let ctx = Context::<CrossBus>::new();
let addr = ctx.address();
source

pub fn address(&self) -> Addr<A::Message>

get the address of the actor what can laterly get access to Sender or Receiver

let addr = ctx.address();
let sender = addr.clone().sender();
let receiver = addr.receiver();
...
source

pub fn id(&self) -> ActorId

get the id of the actor that owns this Context

source

pub fn spawn<F>(&mut self, f: F) -> Handlewhere F: Future<A, Output = ()> + 'static,

spawn a new future for execution

use crossbus::prelude::*;

async fn run() {
  // do work here
}

impl Actor for CrossBus {
    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        // spawn the `run` for execution
        let dur = core::time::Duration::from_secs(1);
        let local = Localizer::new(run());
        ctx.spawn(local);
    }
}
source

pub fn blocking<F>(&mut self, f: F) -> Handlewhere F: Future<A, Output = ()> + 'static, A: Blocking<A>,

block the actor from receiving message until the spawned future is completed

use crossbus::prelude::*;

impl Actor for CrossBus {

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        // block the actor for 1 second
        let dur = core::time::Duration::from_secs(1);
        let sleep_fut = tokio::time::sleep(dur);
        let local = Localizer::new(sleep_fut);
        ctx.blocking(local);
    }
}
source

pub fn blocking_duration<T>(&mut self, duration: Duration) -> Handlewhere A: Blocking<A>, T: Timing + 'static,

Available on crate feature time only.

block the actor from receiving message with specified duration

feature time must be enabled and the Timing implementation is required with which the actor can know the time

use crossbus::prelude::*;

impl Actor for CrossBus {

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        // block the actor for 1 second
        let dur = core::time::Duration::from_secs(1);
        ctx.blocking_duration::<std::time::Instant>(dur);
    }
}
source

pub fn send_message( &mut self, msg: A::Message ) -> Result<(), QueueError<A::Message>>where A::Message: Message + Send + 'static,

send message to the queue

messages may be rejected when actor is blocked or message queue is full/closed

struct Num(uisze);
impl Message for Num {}

struct CrossBus{
    sum: isize
}
impl Actor for CrossBus {
    type Message = Num;

    fn create(ctx: &mut Context<Self>) -> Self {
        Self { sum: 0, }
    }

    fn started(&mut self, ctx: &mut Context<Self>) {
        ctx.send_message(Num(1));
        ctx.send_message(Num(1));
        ctx.send_message(Num(1));
    }

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        self.sum += msg.0;
    }
}
source

pub fn send_message_batch( &mut self, msgs: Vec<A::Message> ) -> Vec<Result<(), QueueError<A::Message>>>where A::Message: Message + Send + 'static,

send a batch of messages to the queue

messages may be rejected when actor is blocked or message queue is full/closed

...
    fn started(&mut self, ctx: &mut Context<Self>) {
        ctx.send_message_batch(vec![Num(1), Num(1), Num(1)]);
    }
...
source

pub fn send_future<F>(&mut self, fut: F) -> Handlewhere F: CoreFuture<Output = ()> + 'static,

send and execute normal future

use crossbus::prelude::*;

// returns unit type `()`
async fn run() {
    // do work here
}

impl Actor for CrossBus {

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        let dur = core::time::Duration::from_secs(1);
        ctx.send_future(run());
    }
}
source

pub fn instant_message(&mut self, msg: A::Message) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,

instantly deliver a message

NOTE that the message bypass the message queue and get handled by Actor::action directly

...
    fn started(&mut self, ctx: &mut Context<Self>) {
        ctx.instant_message(Num(1));
    }
...
source

pub fn delay_message<T: Timing + 'static>( &mut self, msg: A::Message, delay: Duration ) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,

Available on crate feature time only.

deliver a message with specified duration delay

NOTE that the message bypass the message queue and get handled by Actor::action directly

feature time must be enabled and the Timing implementation is required with which the actor can know the time

...
    fn started(&mut self, ctx: &mut Context<Self>) {
        let dur = core::time::Duration::from_secs(1);
        ctx.delay_message(Num(1), dur);
    }
...
source

pub fn delay_message_fn<F>(&mut self, msg: A::Message, f: F) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>, F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,

deliver a message with specified function that it will be delayed as long as the f returns Poll::Pending

NOTE that the message bypass the message queue and get handled by Actor::action directly

source

pub unsafe fn repeat_message<T: Timing + 'static>( &mut self, message: A::Message, dur: Option<Duration>, repeats: Option<usize> ) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,

Available on crate feature time only.

repeatedly deliver a message with specified times and interval duration

feature time must be enabled and the Timing implementation is required with which the actor can know the time

Safety: Since consuming message couple times involving memory safety, message mutating is NOT allowed here! eg it’s ILLEGAL to mutate it in Actor::action or somewhere else, or else it could lead to unexpected behavior. It’s caller’s responsibility to prevent these

NOTE that the message bypass the message queue and get handled by Actor::action directly

the argument repeats is usize, has three scenarios:

  • None: Infinitely deliver the message
  • Some(0): Just deliver the message Once
  • Some(number): Deliver the message number times
...
    fn started(&mut self, ctx: &mut Context<Self>) {
        // repeat 3 times send message with 1s interval
        let dur = core::time::Duration::from_secs(1);
        ctx.repeat_message(Num(1), Some(dur), Some(3));
    }
...
source

pub fn delay_fn<F>(&mut self, f: F) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>, F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,

deliver a message with specified function that it will be delayed as long as the f returns Poll::Pending

the function’s Output MUST be Indicator::Message if not, it will log out the error, and gets ignored

NOTE that the message bypass the message queue and get handled by Actor::action directly

source

pub fn streaming<S>(&mut self, s: S) -> Handlewhere S: CoreStream + 'static, A: Stream<S::Item>,

spawn a stream into the actor

NOTE that the stream item will be processed by Stream::action NOT Actor::action

NOTE that the message bypass the message queue and get handled by Stream::action directly

use crossbus::prelude::*;

struct St { items: Vec<i32> }
impl Stream for St {
    // impl stream for St here
    ...
}

impl Actor for CrossBus {

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        // spawn stream
        let st = St {items: vec![1, 1, 1]};
        ctx.streaming(st);
    }
}
source

pub fn streaming_message<S>(&mut self, s: S) -> Handlewhere A: Actor<Message = S::Item> + MStream<S::Item>, S: CoreStream + 'static, S::Item: Message,

spawn a message stream into the actor

NOTE that the stream item is message will be processed by Actor::action NOT Stream::action

NOTE that the message bypass the message queue and get handled by Actor::action directly

use crossbus::prelude::*;

struct St { items: Vec<i32> }
impl Stream for St {
    // impl stream for St here
    ...
}

impl Actor for CrossBus {

    fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
        // spawn stream
        let st = St {items: vec![Num(1), Num(1), Num(1)]};
        ctx.streaming(st);
    }
}
source

pub fn sender(&self) -> Sender<A::Message>

get the sender of the actor

source

pub fn receiver(&self) -> Receiver<A::Message>

get the receiver of the actor

source

pub fn abort_future(&mut self, handle: Handle)

abort a future

source

pub fn state(&self) -> ActorState

get the current state of the actor

source

pub fn set_state(&mut self, state: ActorState)

set the state of the actor

source

pub fn run(self, act: ActorGuard<A>) -> Addr<A::Message>

run the actor

source

pub fn restart(&mut self)

restart the context, it will

  • flush all blockers
  • flush all running tasks but the message queue will survive
source

pub fn stop(&mut self)

stop the context and the actor

it can be restored if Actor::state returns ActingState::Resume if not, the actor will be stopped

source

pub fn downcast_ref<T: 'static>(&self, handle: Handle) -> Option<&T>

downcast the inner future into type &T, where T is one of the following four types:

source

pub fn downcast_mut<T: 'static>( &mut self, handle: Handle ) -> Option<Pin<&mut T>>

downcast the inner future into type &mut T, where T is one of the following four types:

Trait Implementations§

source§

impl<A: Actor> Debug for Context<A>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<A> !RefUnwindSafe for Context<A>

§

impl<A> !Send for Context<A>

§

impl<A> !Sync for Context<A>

§

impl<A> Unpin for Context<A>

§

impl<A> !UnwindSafe for Context<A>

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.