pub struct RedisStorage<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> { /* private fields */ }Expand description
Represents a Backend that uses Redis for storage.
§Feature Support
§Features
| Feature | Status | Description |
|---|---|---|
TaskSink | ✅ | Ability to push new tasks |
MakeShared | ✅ | Share the same connection across multiple workers |
Workflow | ✅ | Supports workflows and orchestration |
Web Interface | ✅ | Supports apalis-board for monitoring and managing tasks |
WaitForCompletion | ✅ | Wait for tasks to complete without blocking |
Serialization | ✅ | Supports multiple serialization formats such as JSON and MessagePack |
RegisterWorker | ✅ | Allow registering a worker with the backend |
ResumeAbandoned | ✅ | Resume abandoned tasks |
Key: ✅ : Supported | ⚠️ : Not implemented | ❌ : Not Supported | ❗ Limited support
Tests:
§TaskSink
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
backend.push(42).await.unwrap();
async fn task(task: u32, worker: WorkerContext) {
worker.stop().unwrap();
}
let worker = WorkerBuilder::new("task-sink-test")
.backend(backend)
.build(task);
worker.run().await.unwrap();
}
§Workflow
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
backend.push_start(42).await.unwrap();
async fn task1(task: u32, worker: WorkerContext) -> u32 {
task + 99
}
async fn task2(task: u32, worker: WorkerContext) -> u32 {
task + 1
}
async fn task3(task: u32, worker: WorkerContext) {
assert_eq!(task, 142);
worker.stop().unwrap();
}
let workflow = Workflow::new("test-workflow")
.and_then(task1)
.and_then(task2)
.and_then(task3);
let worker = WorkerBuilder::new("workflow-test")
.backend(backend)
.build(workflow);
worker.run().await.unwrap();
}
§WebUI
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
fn assert_web_ui<B: Expose<u32>>(backend: B) {};
assert_web_ui(backend);
}
§WaitForCompletion
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
fn assert_wait_for_completion<B: WaitForCompletion<(), Args = u32>>(backend: B) {};
assert_wait_for_completion(backend);
}
Implementations§
Source§impl<Args, Conn, C> RedisStorage<Args, Conn, C>
impl<Args, Conn, C> RedisStorage<Args, Conn, C>
Sourcepub async fn fetch_next(
worker: &WorkerContext,
config: &RedisConfig,
conn: &mut Conn,
) -> Result<Vec<Task<Vec<u8>, RedisContext, Ulid>>, RedisError>
pub async fn fetch_next( worker: &WorkerContext, config: &RedisConfig, conn: &mut Conn, ) -> Result<Vec<Task<Vec<u8>, RedisContext, Ulid>>, RedisError>
Fetches the next batch of tasks for the given worker.
Source§impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>>
impl<T, Conn: Clone> RedisStorage<T, Conn, JsonCodec<Vec<u8>>>
Sourcepub fn new_with_config(
conn: Conn,
config: RedisConfig,
) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>>
pub fn new_with_config( conn: Conn, config: RedisConfig, ) -> RedisStorage<T, Conn, JsonCodec<Vec<u8>>>
Start a connection with a custom config
Sourcepub fn new_with_codec<K>(
conn: Conn,
config: RedisConfig,
) -> RedisStorage<T, Conn, K>
pub fn new_with_codec<K>( conn: Conn, config: RedisConfig, ) -> RedisStorage<T, Conn, K>
Start a new connection providing custom config and a codec
Sourcepub fn get_connection(&self) -> &Conn
pub fn get_connection(&self) -> &Conn
Get current connection
Sourcepub fn get_config(&self) -> &RedisConfig
pub fn get_config(&self) -> &RedisConfig
Get the config used by the storage
Trait Implementations§
Source§impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
impl<Args, Conn, C> Backend for RedisStorage<Args, Conn, C>
Source§type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, RedisContext, Ulid>>, RedisError>> + Send>>
type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, RedisContext, Ulid>>, RedisError>> + Send>>
A stream of tasks provided by the backend.
Source§type Error = RedisError
type Error = RedisError
The error type returned by backend operations
Source§type Layer = AcknowledgeLayer<RedisAck<Conn, C>>
type Layer = AcknowledgeLayer<RedisAck<Conn, C>>
The type representing backend middleware layer.
Source§type Context = RedisContext
type Context = RedisContext
Context associated with each task.
Source§type Beat = Pin<Box<dyn Stream<Item = Result<(), <RedisStorage<Args, Conn, C> as Backend>::Error>> + Send>>
type Beat = Pin<Box<dyn Stream<Item = Result<(), <RedisStorage<Args, Conn, C> as Backend>::Error>> + Send>>
A stream representing heartbeat signals.
Source§fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat
fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat
Returns a heartbeat stream for the given worker.
Source§fn middleware(&self) -> Self::Layer
fn middleware(&self) -> Self::Layer
Returns the backend’s middleware layer.
Source§fn poll(self, worker: &WorkerContext) -> Self::Stream
fn poll(self, worker: &WorkerContext) -> Self::Stream
Polls the backend for tasks for the given worker.
Source§impl<Args, Conn, C> BackendExt for RedisStorage<Args, Conn, C>
impl<Args, Conn, C> BackendExt for RedisStorage<Args, Conn, C>
Source§type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<<RedisStorage<Args, Conn, C> as BackendExt>::Compact, RedisContext, Ulid>>, RedisError>> + Send>>
type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<<RedisStorage<Args, Conn, C> as BackendExt>::Compact, RedisContext, Ulid>>, RedisError>> + Send>>
A stream of encoded tasks provided by the backend.
Source§fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
Polls the backend for encoded tasks for the given worker.
Source§impl<Args: Sync, Conn, C> ConfigExt for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
impl<Args: Sync, Conn, C> ConfigExt for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
Source§impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send,
Conn: ConnectionLike + Send,
impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send,
Conn: ConnectionLike + Send,
Source§impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Sync + Clone,
impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Sync + Clone,
Source§impl<Args, Conn, C> ListQueues for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send,
Conn: ConnectionLike + Send + Clone,
impl<Args, Conn, C> ListQueues for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send,
Conn: ConnectionLike + Send + Clone,
Source§impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone + Sync,
impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone + Sync,
Source§impl<Args, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone,
impl<Args, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone,
Source§fn list_workers(
&self,
queue: &str,
) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
fn list_workers( &self, queue: &str, ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
List all registered workers in the current queue
Source§fn list_all_workers(
&self,
) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
fn list_all_workers( &self, ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
List all registered workers in all queues
Source§impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone + Sync,
impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>where
RedisStorage<Args, Conn, C>: BackendExt<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = RedisError>,
C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
C::Error: Error + Send + Sync + 'static,
Args: 'static + Send + Sync,
Conn: ConnectionLike + Send + Clone + Sync,
Source§impl<Args, Cdc, Conn> Sink<Task<Vec<u8>, RedisContext, Ulid>> for RedisStorage<Args, Conn, Cdc>
impl<Args, Cdc, Conn> Sink<Task<Vec<u8>, RedisContext, Ulid>> for RedisStorage<Args, Conn, Cdc>
Source§type Error = RedisError
type Error = RedisError
The type of value produced by the sink when an error occurs.
Source§fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>
fn poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>
Attempts to prepare the
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut Self>,
item: Task<Vec<u8>, RedisContext, Ulid>,
) -> Result<(), Self::Error>
fn start_send( self: Pin<&mut Self>, item: Task<Vec<u8>, RedisContext, Ulid>, ) -> Result<(), Self::Error>
Begin the process of sending a value to the sink.
Each call to this function must be preceded by a successful call to
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§impl<Res, Args, Conn, Decode, Err> WaitForCompletion<Res> for RedisStorage<Args, Conn, Decode>where
Args: Unpin + Send + Sync + 'static,
Conn: Clone + ConnectionLike + Send + Sync + 'static,
Decode: Codec<Args, Compact = Vec<u8>, Error = Err> + Codec<Result<Res, String>, Compact = Vec<u8>, Error = Err> + Send + Sync + Unpin + 'static + Clone,
Err: Into<BoxDynError> + Send + 'static,
Res: Send + 'static,
impl<Res, Args, Conn, Decode, Err> WaitForCompletion<Res> for RedisStorage<Args, Conn, Decode>where
Args: Unpin + Send + Sync + 'static,
Conn: Clone + ConnectionLike + Send + Sync + 'static,
Decode: Codec<Args, Compact = Vec<u8>, Error = Err> + Codec<Result<Res, String>, Compact = Vec<u8>, Error = Err> + Send + Sync + Unpin + 'static + Clone,
Err: Into<BoxDynError> + Send + 'static,
Res: Send + 'static,
Source§type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, <RedisStorage<Args, Conn, Decode> as Backend>::Error>> + Send>>
type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, <RedisStorage<Args, Conn, Decode> as Backend>::Error>> + Send>>
The result stream type yielding task results
Source§fn wait_for(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
) -> Self::ResultStream
fn wait_for( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>, ) -> Self::ResultStream
Wait for multiple tasks to complete, yielding results as they become available
Source§async fn check_status(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
) -> Result<Vec<TaskResult<Res>>, Self::Error>
async fn check_status( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send, ) -> Result<Vec<TaskResult<Res>>, Self::Error>
Check current status of tasks without waiting
Source§fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
Wait for a single task to complete, yielding its result
Auto Trait Implementations§
impl<Args, Conn, C> Freeze for RedisStorage<Args, Conn, C>where
Conn: Freeze,
impl<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> !RefUnwindSafe for RedisStorage<Args, Conn, C>
impl<Args, Conn, C> Send for RedisStorage<Args, Conn, C>
impl<Args, Conn, C> Sync for RedisStorage<Args, Conn, C>
impl<Args, Conn, C> Unpin for RedisStorage<Args, Conn, C>
impl<Args, Conn = ConnectionManager, C = JsonCodec<Vec<u8>>> !UnwindSafe for RedisStorage<Args, Conn, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T, Item> SinkExt<Item> for T
impl<T, Item> SinkExt<Item> for T
Source§fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
Composes a function in front of the sink. Read more
Source§fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
Composes a function in front of the sink. Read more
Source§fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
Transforms the error returned by the sink.
Source§fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
Map this sink’s error to a different error type using the
Into trait. Read moreSource§fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
Available on crate feature
alloc only.Adds a fixed-size buffer to the current sink. Read more
Source§fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
Flush the sink, processing all pending items. Read more
Source§fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been fully processed
into the sink, including flushing. Read more
Source§fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been received
by the sink. Read more
Source§fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
A future that completes after the given stream has been fully processed
into the sink, including flushing. Read more
Source§fn right_sink<Si1>(self) -> Either<Si1, Self>
fn right_sink<Si1>(self) -> Either<Si1, Self>
Source§fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
A convenience method for calling
Sink::poll_ready on Unpin
sink types.Source§fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
A convenience method for calling
Sink::start_send on Unpin
sink types.