Struct apalis_core::worker::Context

source ·
pub struct Context<W: Actor> { /* private fields */ }
Available on crate feature worker only.
Expand description

Context for execution of actor

Implementations§

Default constructor

Examples found in repository?
src/worker/mod.rs (line 435)
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
    pub async fn start(self) -> Addr<A> {
        let address = self.address;
        let mut receiver = self.receiver;
        let mut actor = self.actor;
        let (handle_sender, handle_receiver) = oneshot::channel();
        let move_addr = address.clone();
        let actor_future = async move {
            #[allow(clippy::expect_used)]
            let join_handle = handle_receiver
                .await
                .expect("Unreachable as the message is always sent.");
            let mut ctx = Context::new(move_addr.clone(), join_handle);
            actor.on_start(&mut ctx).await;
            while let Some(Envelope(mut message)) = receiver.recv().await {
                EnvelopeProxy::handle(&mut *message, &mut actor, &mut ctx).await;
            }
            tracing::error!(actor = std::any::type_name::<A>(), "actor stopped");
            actor.on_stop(&mut ctx).await;
            // tokio::time::sleep(Duration::from_secs(1)).await;
            // Restart
        }
        .in_current_span();
        #[cfg(not(feature = "broker"))]
        let join_handle = task::spawn(actor_future);
        #[cfg(feature = "broker")]
        let join_handle = deadlock::spawn_task_with_actor_id(address.actor_id, actor_future);
        // TODO: propagate the error.
        let _error = handle_sender.send(join_handle);
        address
    }

Gets an address of current worker

Examples found in repository?
src/worker/mod.rs (line 535)
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
    pub fn recipient<M>(&self) -> Recipient<M>
    where
        M: Message + Send + 'static,
        W: ContextHandler<M>,
        M::Result: Send,
    {
        self.addr().recipient()
    }

    /// Sends worker specified message
    pub fn notify<M>(&self, message: M)
    where
        M: Message + Send + 'static,
        W: ContextHandler<M>,
        M::Result: Send,
    {
        let addr = self.addr();
        drop(task::spawn(
            async move { addr.do_send(message).await }.in_current_span(),
        ));
    }

    /// Sends actor specified message in some time
    pub fn notify_later<M>(&self, message: M, later: Duration)
    where
        M: Message + Send + 'static,
        W: Handler<M>,
        M::Result: Send,
    {
        let addr = self.addr();
        drop(task::spawn(
            async move {
                time::sleep(later).await;
                addr.do_send(message).await
            }
            .in_current_span(),
        ));
    }

    /// Sends actor specified message in a loop with specified duration
    pub fn notify_every<M>(&self, every: Duration)
    where
        M: Message + Default + Send + 'static,
        W: Handler<M>,
        M::Result: Send,
    {
        let addr = self.addr();
        drop(task::spawn(
            async move {
                loop {
                    time::sleep(every).await;
                    let _res = addr.do_send(M::default()).await;
                }
            }
            .in_current_span(),
        ));
    }

    /// Notifies actor with items from stream
    pub fn notify_with<M, S>(&self, mut stream: S)
    where
        M: Message + Send + 'static,
        S: Stream<Item = M> + Unpin + Send + 'static,
        W: Handler<M>,
        M::Result: Send,
    {
        let addr = self.addr();
        drop(task::spawn(
            async move {
                while let Some(item) = stream.next().await {
                    addr.do_send(item).await;
                }
            }
            .in_current_span(),
        ));
    }

Gets an recipient for current worker with specified message type

Examples found in repository?
src/worker/broker.rs (line 66)
65
66
67
    pub fn subscribe<M: BrokerMessage, A: Actor + ContextHandler<M>>(&self, ctx: &mut Context<A>) {
        self.subscribe_recipient(ctx.recipient::<M>())
    }
More examples
Hide additional examples
src/worker/deadlock.rs (line 145)
144
145
146
147
148
149
150
151
152
    async fn on_start(&mut self, ctx: &mut Context<Self>) {
        let recipient = ctx.recipient::<Reminder>();
        drop(task::spawn(async move {
            loop {
                recipient.send(Reminder).await;
                time::sleep(Duration::from_millis(100)).await
            }
        }));
    }

Sends worker specified message

Sends actor specified message in some time

Sends actor specified message in a loop with specified duration

Notifies actor with items from stream

Examples found in repository?
src/worker/mod.rs (line 312)
300
301
302
303
304
305
306
307
308
309
310
311
312
313
    async fn on_start(&mut self, ctx: &mut Context<Self>) {
        <W as Worker>::on_start(self, ctx).await;
        let jobs = self
            .consume()
            // errors are silenced for now
            .then(|r| async move {
                match r {
                    Ok(Some(job)) => Ok(Some(job)),
                    _ => Ok(None),
                }
            });
        let stream = jobs.map(|c| JobRequestWrapper(c));
        ctx.notify_with(Box::pin(stream));
    }
More examples
Hide additional examples
src/storage/worker.rs (line 114)
112
113
114
115
116
117
118
119
120
121
122
123
124
    async fn on_start(&mut self, ctx: &mut Context<Self>) {
        // To change this just modify the controller then restart.
        ctx.notify_with(KeepAliveStream::new(interval(self.config.keep_alive)));
        // Sets up reactivate orphaned jobs
        // Setup scheduling for non_sql storages eg Redis
        for (pulse, duration) in self.config.heartbeats.iter() {
            let start = Instant::now() + Duration::from_millis(17);
            ctx.notify_with(HeartbeatStream::new(
                pulse.clone(),
                interval_at(start, *duration),
            ));
        }
    }

Trait Implementations§

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The alignment of pointer.
The type for initializers.
Initializes a with the given initializer. Read more
Dereferences the given pointer. Read more
Mutably dereferences the given pointer. Read more
Drops the object pointed to by the given pointer. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more