Struct apalis_core::worker::Context
source · pub struct Context<W: Actor> { /* private fields */ }Available on crate feature
worker only.Expand description
Context for execution of actor
Implementations§
source§impl<W: Actor> Context<W>
impl<W: Actor> Context<W>
sourcepub fn new(addr: Addr<W>, handle: JoinHandle<()>) -> Self
pub fn new(addr: Addr<W>, handle: JoinHandle<()>) -> Self
Default constructor
Examples found in repository?
src/worker/mod.rs (line 435)
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
pub async fn start(self) -> Addr<A> {
let address = self.address;
let mut receiver = self.receiver;
let mut actor = self.actor;
let (handle_sender, handle_receiver) = oneshot::channel();
let move_addr = address.clone();
let actor_future = async move {
#[allow(clippy::expect_used)]
let join_handle = handle_receiver
.await
.expect("Unreachable as the message is always sent.");
let mut ctx = Context::new(move_addr.clone(), join_handle);
actor.on_start(&mut ctx).await;
while let Some(Envelope(mut message)) = receiver.recv().await {
EnvelopeProxy::handle(&mut *message, &mut actor, &mut ctx).await;
}
tracing::error!(actor = std::any::type_name::<A>(), "actor stopped");
actor.on_stop(&mut ctx).await;
// tokio::time::sleep(Duration::from_secs(1)).await;
// Restart
}
.in_current_span();
#[cfg(not(feature = "broker"))]
let join_handle = task::spawn(actor_future);
#[cfg(feature = "broker")]
let join_handle = deadlock::spawn_task_with_actor_id(address.actor_id, actor_future);
// TODO: propagate the error.
let _error = handle_sender.send(join_handle);
address
}sourcepub fn addr(&self) -> Addr<W>
pub fn addr(&self) -> Addr<W>
Gets an address of current worker
Examples found in repository?
src/worker/mod.rs (line 535)
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
pub fn recipient<M>(&self) -> Recipient<M>
where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
{
self.addr().recipient()
}
/// Sends worker specified message
pub fn notify<M>(&self, message: M)
where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move { addr.do_send(message).await }.in_current_span(),
));
}
/// Sends actor specified message in some time
pub fn notify_later<M>(&self, message: M, later: Duration)
where
M: Message + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
time::sleep(later).await;
addr.do_send(message).await
}
.in_current_span(),
));
}
/// Sends actor specified message in a loop with specified duration
pub fn notify_every<M>(&self, every: Duration)
where
M: Message + Default + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
loop {
time::sleep(every).await;
let _res = addr.do_send(M::default()).await;
}
}
.in_current_span(),
));
}
/// Notifies actor with items from stream
pub fn notify_with<M, S>(&self, mut stream: S)
where
M: Message + Send + 'static,
S: Stream<Item = M> + Unpin + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
while let Some(item) = stream.next().await {
addr.do_send(item).await;
}
}
.in_current_span(),
));
}sourcepub fn recipient<M>(&self) -> Recipient<M>where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
pub fn recipient<M>(&self) -> Recipient<M>where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
Gets an recipient for current worker with specified message type
Examples found in repository?
More examples
sourcepub fn notify<M>(&self, message: M)where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
pub fn notify<M>(&self, message: M)where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
Sends worker specified message
sourcepub fn notify_later<M>(&self, message: M, later: Duration)where
M: Message + Send + 'static,
W: Handler<M>,
M::Result: Send,
pub fn notify_later<M>(&self, message: M, later: Duration)where
M: Message + Send + 'static,
W: Handler<M>,
M::Result: Send,
Sends actor specified message in some time
sourcepub fn notify_every<M>(&self, every: Duration)where
M: Message + Default + Send + 'static,
W: Handler<M>,
M::Result: Send,
pub fn notify_every<M>(&self, every: Duration)where
M: Message + Default + Send + 'static,
W: Handler<M>,
M::Result: Send,
Sends actor specified message in a loop with specified duration
sourcepub fn notify_with<M, S>(&self, stream: S)where
M: Message + Send + 'static,
S: Stream<Item = M> + Unpin + Send + 'static,
W: Handler<M>,
M::Result: Send,
pub fn notify_with<M, S>(&self, stream: S)where
M: Message + Send + 'static,
S: Stream<Item = M> + Unpin + Send + 'static,
W: Handler<M>,
M::Result: Send,
Notifies actor with items from stream
Examples found in repository?
src/worker/mod.rs (line 312)
300 301 302 303 304 305 306 307 308 309 310 311 312 313
async fn on_start(&mut self, ctx: &mut Context<Self>) {
<W as Worker>::on_start(self, ctx).await;
let jobs = self
.consume()
// errors are silenced for now
.then(|r| async move {
match r {
Ok(Some(job)) => Ok(Some(job)),
_ => Ok(None),
}
});
let stream = jobs.map(|c| JobRequestWrapper(c));
ctx.notify_with(Box::pin(stream));
}More examples
src/storage/worker.rs (line 114)
112 113 114 115 116 117 118 119 120 121 122 123 124
async fn on_start(&mut self, ctx: &mut Context<Self>) {
// To change this just modify the controller then restart.
ctx.notify_with(KeepAliveStream::new(interval(self.config.keep_alive)));
// Sets up reactivate orphaned jobs
// Setup scheduling for non_sql storages eg Redis
for (pulse, duration) in self.config.heartbeats.iter() {
let start = Instant::now() + Duration::from_millis(17);
ctx.notify_with(HeartbeatStream::new(
pulse.clone(),
interval_at(start, *duration),
));
}
}