Worker

Struct Worker 

Source
pub struct Worker<Args, Ctx, Backend, Svc, Middleware> { /* private fields */ }
Expand description

Core component responsible for task polling, execution, and lifecycle management.

§Example

Basic example:


#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    for i in 0..5 {
        storage.push(i).await?;
    }

    async fn handler(task: u32) {
        println!("Processing task: {task}");
    }

    let worker = WorkerBuilder::new("worker-1")
        .backend(storage)
        .build(handler);

    worker.run().await?;
    Ok(())
}

See module level documentation for more details.

Implementations§

Source§

impl<Args, Ctx, B, Svc, M> Worker<Args, Ctx, B, Svc, M>

Source

pub fn new(name: String, backend: B, service: Svc, layers: M) -> Self

Build a worker that is ready for execution

Source§

impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>
where B: Backend<Args = Args>, S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static, B::Stream: Unpin + Send + 'static, B::Beat: Unpin + Send + 'static, Args: Send + 'static, B::Context: Send + 'static, B::Error: Into<BoxDynError> + Send + 'static, B::Layer: Layer<ReadinessService<TrackerService<S>>>, M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>, <M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static, <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send, <<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static, B::IdType: Send + 'static,

Source

pub async fn run(self) -> Result<(), WorkerError>

Run the worker until completion

§Example

#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    for i in 0..5 {
        storage.push(i).await?;
    }

    async fn handler(task: u32) {
        println!("Processing task: {task}");
    }

    let worker = WorkerBuilder::new("worker-1")
        .backend(storage)
        .build(handler);

    worker.run().await?;
    Ok(())
}
Source

pub async fn run_with_ctx( self, ctx: &mut WorkerContext, ) -> Result<(), WorkerError>

Run the worker with the given context.

See run for an example.

Source

pub async fn run_until<Fut, Err>(self, signal: Fut) -> Result<(), WorkerError>
where Fut: Future<Output = Result<(), Err>> + Send + 'static, B: Send, M: Send, Err: Into<WorkerError> + Send + 'static,

Run the worker until a shutdown signal future is complete.

Source

pub async fn run_until_ctx<F, Fut>(self, fut: F) -> Result<(), WorkerError>
where F: FnOnce(WorkerContext) -> Fut + Send + 'static, Fut: Future<Output = Result<(), WorkerError>> + Send + 'static, B: Send, M: Send,

Run the worker until a shutdown signal future is complete.

Note: Using this function requires you to call ctx.stop() in the future to completely stop the worker.

This can also be very powerful with pausing and resuming the worker using the context.

§Example


#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    for i in 0..5 {
        storage.push(i).await?;
    }
    async fn handler(task: u32) {
        println!("Processing task: {task}");
    }
    let worker = WorkerBuilder::new("worker-1")
        .backend(storage)
        .build(handler);
    worker.run_until_ctx(|ctx| async move {
        sleep(Duration::from_secs(1)).await;
        ctx.stop()?;
        Ok(())
    }).await?;
    Ok(())
}
Source

pub fn stream( self, ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>

Returns a stream that will yield events as they occur within the worker’s lifecycle

§Example
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
    let mut storage = MemoryStorage::new();
    for i in 0..5 {
        storage.push(i).await?;
    }
    async fn handler(task: u32) {
        println!("Processing task: {task}");
    }
    let worker = WorkerBuilder::new("worker-1")
        .backend(storage)
        .build(handler);
    let mut stream = worker.stream();
    while let Some(evt) = stream.next().await {
        println!("Event: {:?}", evt);
    }
    Ok(())
}
Source

pub fn stream_with_ctx( self, ctx: &mut WorkerContext, ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>

Returns a stream that will yield events as they occur within the worker’s lifecycle when provided with a WorkerContext.

See stream for an example.

Trait Implementations§

Source§

impl<Args, Ctx, B, Svc, Middleware> Debug for Worker<Args, Ctx, B, Svc, Middleware>
where Svc: Debug, B: Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<Args, Ctx, Backend, Svc, Middleware> Freeze for Worker<Args, Ctx, Backend, Svc, Middleware>
where Backend: Freeze, Svc: Freeze, Middleware: Freeze,

§

impl<Args, Ctx, Backend, Svc, Middleware> !RefUnwindSafe for Worker<Args, Ctx, Backend, Svc, Middleware>

§

impl<Args, Ctx, Backend, Svc, Middleware> Send for Worker<Args, Ctx, Backend, Svc, Middleware>
where Backend: Send, Svc: Send, Middleware: Send, Args: Send, Ctx: Send,

§

impl<Args, Ctx, Backend, Svc, Middleware> Sync for Worker<Args, Ctx, Backend, Svc, Middleware>
where Backend: Sync, Svc: Sync, Middleware: Sync, Args: Sync, Ctx: Sync,

§

impl<Args, Ctx, Backend, Svc, Middleware> Unpin for Worker<Args, Ctx, Backend, Svc, Middleware>
where Backend: Unpin, Svc: Unpin, Middleware: Unpin, Args: Unpin, Ctx: Unpin,

§

impl<Args, Ctx, Backend, Svc, Middleware> !UnwindSafe for Worker<Args, Ctx, Backend, Svc, Middleware>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more