pub struct Context<A: Actor> { /* private fields */ }
Expand description
execution state and local environment for actor during runtime
Implementations§
source§impl<A: Actor> Context<A>
impl<A: Actor> Context<A>
sourcepub fn new() -> Self
pub fn new() -> Self
create an instance of Context
struct CrossBus {}
impl Actor for CrossBus {
...
}
let ctx = Context::<CrossBus>::new();
let addr = ctx.address();
sourcepub fn address(&self) -> Addr<A::Message>
pub fn address(&self) -> Addr<A::Message>
get the address of the actor
what can laterly get access
to Sender
or Receiver
let addr = ctx.address();
let sender = addr.clone().sender();
let receiver = addr.receiver();
...
sourcepub fn spawn<F>(&mut self, f: F) -> Handlewhere
F: Future<A, Output = ()> + 'static,
pub fn spawn<F>(&mut self, f: F) -> Handlewhere F: Future<A, Output = ()> + 'static,
spawn a new future for execution
use crossbus::prelude::*;
async fn run() {
// do work here
}
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
// spawn the `run` for execution
let dur = core::time::Duration::from_secs(1);
let local = Localizer::new(run());
ctx.spawn(local);
}
}
sourcepub fn blocking<F>(&mut self, f: F) -> Handlewhere
F: Future<A, Output = ()> + 'static,
A: Blocking<A>,
pub fn blocking<F>(&mut self, f: F) -> Handlewhere F: Future<A, Output = ()> + 'static, A: Blocking<A>,
block the actor from receiving message until the spawned future is completed
use crossbus::prelude::*;
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
// block the actor for 1 second
let dur = core::time::Duration::from_secs(1);
let sleep_fut = tokio::time::sleep(dur);
let local = Localizer::new(sleep_fut);
ctx.blocking(local);
}
}
sourcepub fn blocking_duration<T>(&mut self, duration: Duration) -> Handlewhere
A: Blocking<A>,
T: Timing + 'static,
Available on crate feature time
only.
pub fn blocking_duration<T>(&mut self, duration: Duration) -> Handlewhere A: Blocking<A>, T: Timing + 'static,
time
only.block the actor from receiving message with specified duration
feature time
must be enabled
and the Timing implementation
is required with which the actor can know
the time
use crossbus::prelude::*;
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
// block the actor for 1 second
let dur = core::time::Duration::from_secs(1);
ctx.blocking_duration::<std::time::Instant>(dur);
}
}
sourcepub fn send_message(
&mut self,
msg: A::Message
) -> Result<(), QueueError<A::Message>>where
A::Message: Message + Send + 'static,
pub fn send_message( &mut self, msg: A::Message ) -> Result<(), QueueError<A::Message>>where A::Message: Message + Send + 'static,
send message to the queue
messages may be rejected when actor is blocked or message queue is full/closed
struct Num(uisze);
impl Message for Num {}
struct CrossBus{
sum: isize
}
impl Actor for CrossBus {
type Message = Num;
fn create(ctx: &mut Context<Self>) -> Self {
Self { sum: 0, }
}
fn started(&mut self, ctx: &mut Context<Self>) {
ctx.send_message(Num(1));
ctx.send_message(Num(1));
ctx.send_message(Num(1));
}
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
self.sum += msg.0;
}
}
sourcepub fn send_message_batch(
&mut self,
msgs: Vec<A::Message>
) -> Vec<Result<(), QueueError<A::Message>>>where
A::Message: Message + Send + 'static,
pub fn send_message_batch( &mut self, msgs: Vec<A::Message> ) -> Vec<Result<(), QueueError<A::Message>>>where A::Message: Message + Send + 'static,
send a batch of messages to the queue
messages may be rejected when actor is blocked or message queue is full/closed
...
fn started(&mut self, ctx: &mut Context<Self>) {
ctx.send_message_batch(vec![Num(1), Num(1), Num(1)]);
}
...
sourcepub fn send_future<F>(&mut self, fut: F) -> Handlewhere
F: CoreFuture<Output = ()> + 'static,
pub fn send_future<F>(&mut self, fut: F) -> Handlewhere F: CoreFuture<Output = ()> + 'static,
send and execute normal future
use crossbus::prelude::*;
// returns unit type `()`
async fn run() {
// do work here
}
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
let dur = core::time::Duration::from_secs(1);
ctx.send_future(run());
}
}
sourcepub fn instant_message(&mut self, msg: A::Message) -> Handlewhere
A::Message: Message + Unpin + 'static,
A: Delaying<A>,
pub fn instant_message(&mut self, msg: A::Message) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,
instantly deliver a message
NOTE that the message bypass the message queue and get handled by Actor::action directly
...
fn started(&mut self, ctx: &mut Context<Self>) {
ctx.instant_message(Num(1));
}
...
sourcepub fn delay_message<T: Timing + 'static>(
&mut self,
msg: A::Message,
delay: Duration
) -> Handlewhere
A::Message: Message + Unpin + 'static,
A: Delaying<A>,
Available on crate feature time
only.
pub fn delay_message<T: Timing + 'static>( &mut self, msg: A::Message, delay: Duration ) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,
time
only.deliver a message with specified duration delay
NOTE that the message bypass the message queue and get handled by Actor::action directly
feature time
must be enabled
and the Timing implementation
is required with which the actor can know
the time
...
fn started(&mut self, ctx: &mut Context<Self>) {
let dur = core::time::Duration::from_secs(1);
ctx.delay_message(Num(1), dur);
}
...
sourcepub fn delay_message_fn<F>(&mut self, msg: A::Message, f: F) -> Handlewhere
A::Message: Message + Unpin + 'static,
A: Delaying<A>,
F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,
pub fn delay_message_fn<F>(&mut self, msg: A::Message, f: F) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>, F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,
deliver a message with specified function that
it will be delayed as long as the f
returns
Poll::Pending
NOTE that the message bypass the message queue and get handled by Actor::action directly
sourcepub unsafe fn repeat_message<T: Timing + 'static>(
&mut self,
message: A::Message,
dur: Option<Duration>,
repeats: Option<usize>
) -> Handlewhere
A::Message: Message + Unpin + 'static,
A: Delaying<A>,
Available on crate feature time
only.
pub unsafe fn repeat_message<T: Timing + 'static>( &mut self, message: A::Message, dur: Option<Duration>, repeats: Option<usize> ) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>,
time
only.repeatedly deliver a message with specified times and interval duration
feature time
must be enabled
and the Timing implementation
is required with which the actor can know
the time
Safety: Since consuming message
couple times
involving memory safety, message mutating is NOT
allowed here!
eg it’s ILLEGAL to mutate it in Actor::action or
somewhere else, or else it could lead to unexpected
behavior. It’s caller’s responsibility to prevent these
NOTE that the message bypass the message queue and get handled by Actor::action directly
the argument repeats
is usize
, has three scenarios:
- None: Infinitely deliver the message
- Some(0): Just deliver the message Once
- Some(number): Deliver the message
number
times
...
fn started(&mut self, ctx: &mut Context<Self>) {
// repeat 3 times send message with 1s interval
let dur = core::time::Duration::from_secs(1);
ctx.repeat_message(Num(1), Some(dur), Some(3));
}
...
sourcepub fn delay_fn<F>(&mut self, f: F) -> Handlewhere
A::Message: Message + Unpin + 'static,
A: Delaying<A>,
F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,
pub fn delay_fn<F>(&mut self, f: F) -> Handlewhere A::Message: Message + Unpin + 'static, A: Delaying<A>, F: FnMut(&mut A, &mut Context<A>, &mut CoreContext<'_>) -> Poll<Indicator<A::Message>> + 'static + Unpin,
deliver a message with specified
function that it will be delayed
as long as the f
returns Poll::Pending
the function’s Output MUST be Indicator::Message
if not, it will log out the error,
and gets ignored
NOTE that the message bypass the message queue and get handled by Actor::action directly
sourcepub fn streaming<S>(&mut self, s: S) -> Handlewhere
S: CoreStream + 'static,
A: Stream<S::Item>,
pub fn streaming<S>(&mut self, s: S) -> Handlewhere S: CoreStream + 'static, A: Stream<S::Item>,
spawn a stream into the actor
NOTE that the stream item
will be processed by Stream::action
NOT Actor::action
NOTE that the message bypass the message queue and get handled by Stream::action directly
use crossbus::prelude::*;
struct St { items: Vec<i32> }
impl Stream for St {
// impl stream for St here
...
}
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
// spawn stream
let st = St {items: vec![1, 1, 1]};
ctx.streaming(st);
}
}
sourcepub fn streaming_message<S>(&mut self, s: S) -> Handlewhere
A: Actor<Message = S::Item> + MStream<S::Item>,
S: CoreStream + 'static,
S::Item: Message,
pub fn streaming_message<S>(&mut self, s: S) -> Handlewhere A: Actor<Message = S::Item> + MStream<S::Item>, S: CoreStream + 'static, S::Item: Message,
spawn a message stream into the actor
NOTE that the stream item is message
will be processed by Actor::action
NOT Stream::action
NOTE that the message bypass the message queue and get handled by Actor::action directly
use crossbus::prelude::*;
struct St { items: Vec<i32> }
impl Stream for St {
// impl stream for St here
...
}
impl Actor for CrossBus {
fn action(&mut self, msg: Self::Message, ctx: &mut Context<Self>) {
// spawn stream
let st = St {items: vec![Num(1), Num(1), Num(1)]};
ctx.streaming(st);
}
}
sourcepub fn abort_future(&mut self, handle: Handle)
pub fn abort_future(&mut self, handle: Handle)
abort a future
sourcepub fn state(&self) -> ActorState
pub fn state(&self) -> ActorState
get the current state of the actor
sourcepub fn set_state(&mut self, state: ActorState)
pub fn set_state(&mut self, state: ActorState)
set the state of the actor
sourcepub fn run(self, act: ActorGuard<A>) -> Addr<A::Message>
pub fn run(self, act: ActorGuard<A>) -> Addr<A::Message>
run the actor
sourcepub fn restart(&mut self)
pub fn restart(&mut self)
restart the context, it will
- flush all blockers
- flush all running tasks but the message queue will survive
sourcepub fn stop(&mut self)
pub fn stop(&mut self)
stop the context and the actor
it can be restored if Actor::state
returns ActingState::Resume
if not, the actor will be stopped
sourcepub fn downcast_ref<T: 'static>(&self, handle: Handle) -> Option<&T>
pub fn downcast_ref<T: 'static>(&self, handle: Handle) -> Option<&T>
downcast the inner future into
type &T
, where T
is one
of the following four types: