pub struct Worker<Args, Ctx, Backend, Svc, Middleware> { /* private fields */ }Expand description
Core component responsible for task polling, execution, and lifecycle management.
§Example
Basic example:
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
for i in 0..5 {
storage.push(i).await?;
}
async fn handler(task: u32) {
println!("Processing task: {task}");
}
let worker = WorkerBuilder::new("worker-1")
.backend(storage)
.build(handler);
worker.run().await?;
Ok(())
}See module level documentation for more details.
Implementations§
Source§impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>where
B: Backend<Args = Args>,
S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
B::Stream: Unpin + Send + 'static,
B::Beat: Unpin + Send + 'static,
Args: Send + 'static,
B::Context: Send + 'static,
B::Error: Into<BoxDynError> + Send + 'static,
B::Layer: Layer<ReadinessService<TrackerService<S>>>,
M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
<<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
<<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
B::IdType: Send + 'static,
impl<Args, S, B, M> Worker<Args, B::Context, B, S, M>where
B: Backend<Args = Args>,
S: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
B::Stream: Unpin + Send + 'static,
B::Beat: Unpin + Send + 'static,
Args: Send + 'static,
B::Context: Send + 'static,
B::Error: Into<BoxDynError> + Send + 'static,
B::Layer: Layer<ReadinessService<TrackerService<S>>>,
M: Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>,
<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service: Service<Task<Args, B::Context, B::IdType>> + Send + 'static,
<<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Future: Send,
<<M as Layer<<B::Layer as Layer<ReadinessService<TrackerService<S>>>>::Service>>::Service as Service<Task<Args, B::Context, B::IdType>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
B::IdType: Send + 'static,
Sourcepub async fn run(self) -> Result<(), WorkerError>
pub async fn run(self) -> Result<(), WorkerError>
Run the worker until completion
§Example
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
for i in 0..5 {
storage.push(i).await?;
}
async fn handler(task: u32) {
println!("Processing task: {task}");
}
let worker = WorkerBuilder::new("worker-1")
.backend(storage)
.build(handler);
worker.run().await?;
Ok(())
}Sourcepub async fn run_with_ctx(
self,
ctx: &mut WorkerContext,
) -> Result<(), WorkerError>
pub async fn run_with_ctx( self, ctx: &mut WorkerContext, ) -> Result<(), WorkerError>
Run the worker with the given context.
See run for an example.
Sourcepub async fn run_until<Fut, Err>(self, signal: Fut) -> Result<(), WorkerError>
pub async fn run_until<Fut, Err>(self, signal: Fut) -> Result<(), WorkerError>
Run the worker until a shutdown signal future is complete.
Sourcepub async fn run_until_ctx<F, Fut>(self, fut: F) -> Result<(), WorkerError>where
F: FnOnce(WorkerContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
B: Send,
M: Send,
pub async fn run_until_ctx<F, Fut>(self, fut: F) -> Result<(), WorkerError>where
F: FnOnce(WorkerContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<(), WorkerError>> + Send + 'static,
B: Send,
M: Send,
Run the worker until a shutdown signal future is complete.
Note: Using this function requires you to call ctx.stop() in the future to completely stop the worker.
This can also be very powerful with pausing and resuming the worker using the context.
§Example
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
for i in 0..5 {
storage.push(i).await?;
}
async fn handler(task: u32) {
println!("Processing task: {task}");
}
let worker = WorkerBuilder::new("worker-1")
.backend(storage)
.build(handler);
worker.run_until_ctx(|ctx| async move {
sleep(Duration::from_secs(1)).await;
ctx.stop()?;
Ok(())
}).await?;
Ok(())
}Sourcepub fn stream(
self,
) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>
pub fn stream( self, ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>
Returns a stream that will yield events as they occur within the worker’s lifecycle
§Example
#[tokio::main]
async fn main() -> Result<(), BoxDynError> {
let mut storage = MemoryStorage::new();
for i in 0..5 {
storage.push(i).await?;
}
async fn handler(task: u32) {
println!("Processing task: {task}");
}
let worker = WorkerBuilder::new("worker-1")
.backend(storage)
.build(handler);
let mut stream = worker.stream();
while let Some(evt) = stream.next().await {
println!("Event: {:?}", evt);
}
Ok(())
}Sourcepub fn stream_with_ctx(
self,
ctx: &mut WorkerContext,
) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>
pub fn stream_with_ctx( self, ctx: &mut WorkerContext, ) -> impl Stream<Item = Result<Event, WorkerError>> + use<Args, S, B, M>
Returns a stream that will yield events as they occur within the worker’s lifecycle when provided
with a WorkerContext.
See stream for an example.