pub struct JsonStorage<Args> { /* private fields */ }Expand description
A backend that persists to a file using json encoding
§Features
| Feature | Status | Description |
|---|---|---|
TaskSink | ✅ | Ability to push new tasks |
Serialization | ✅ ❗ | Serialization support for arguments. Only accepts json |
FetchById | ⚠️ | Allow fetching a task by its ID |
RegisterWorker | ❌ | Allow registering a worker with the backend |
PipeExt | ✅ | Allow other backends to pipe to this backend |
MakeShared | ✅ | Share the same JSON storage across multiple workers |
Workflow | ✅ | Flexible enough to support workflows |
WaitForCompletion | ✅ | Wait for tasks to complete without blocking |
ResumeById | ⚠️ | Resume a task by its ID |
ResumeAbandoned | ⚠️ | Resume abandoned tasks |
ListWorkers | ❌ | List all workers registered with the backend |
ListTasks | ⚠️ | List all tasks in the backend |
Key: ✅ : Supported | ⚠️ : Not implemented | ❌ : Not Supported | ❗ Limited support
Examples:
§TaskSink Example
use apalis_core::backend::TaskSink;
use apalis_core::worker::context::WorkerContext;
use apalis_core::worker::builder::WorkerBuilder;
#[tokio::main]
async fn main() {
let mut backend = {
use apalis_core::backend::json::JsonStorage;
JsonStorage::new_temp().unwrap()
};
use apalis_core::backend::TaskSink;
backend.push(42).await.unwrap();
async fn task(task: u32, worker: WorkerContext) {
worker.stop().unwrap();
}
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(task);
worker.run().await.unwrap();
}
Implementations§
Source§impl<Args> JsonStorage<Args>
impl<Args> JsonStorage<Args>
Sourcepub fn new(path: impl Into<PathBuf>) -> Result<Self>
pub fn new(path: impl Into<PathBuf>) -> Result<Self>
Creates a new JsonStorage instance using the specified file path.
Sourcepub fn new_temp() -> Result<JsonStorage<Args>, Error>
pub fn new_temp() -> Result<JsonStorage<Args>, Error>
Creates a new temporary JsonStorage instance.
Sourcepub fn remove(&mut self, key: &TaskKey) -> Result<Option<TaskWithMeta>>
pub fn remove(&mut self, key: &TaskKey) -> Result<Option<TaskWithMeta>>
Removes a task from the storage.
Sourcepub fn reload(&mut self) -> Result<()>
pub fn reload(&mut self) -> Result<()>
Reload data from disk, useful if the file was modified externally
Sourcepub fn update_status(
&mut self,
old_key: &TaskKey,
new_status: Status,
) -> Result<bool>
pub fn update_status( &mut self, old_key: &TaskKey, new_status: Status, ) -> Result<bool>
Update the status of an existing key
Trait Implementations§
Source§impl<Args: 'static + Send + DeserializeOwned + Unpin> Backend<Args> for JsonStorage<Args>
impl<Args: 'static + Send + DeserializeOwned + Unpin> Backend<Args> for JsonStorage<Args>
Source§type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, Map<String, Value>>>, SendError>> + Send>>
type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, Map<String, Value>>>, SendError>> + Send>>
A stream of tasks provided by the backend.
Source§type Layer = AcknowledgeLayer<JsonAck<Args>>
type Layer = AcknowledgeLayer<JsonAck<Args>>
The type representing backend middleware layer.
Source§type Beat = Pin<Box<dyn Stream<Item = Result<(), <JsonStorage<Args> as Backend<Args>>::Error>> + Send>>
type Beat = Pin<Box<dyn Stream<Item = Result<(), <JsonStorage<Args> as Backend<Args>>::Error>> + Send>>
A stream representing heartbeat signals.
Source§fn heartbeat(&self, _: &WorkerContext) -> Self::Beat
fn heartbeat(&self, _: &WorkerContext) -> Self::Beat
Returns a heartbeat stream for the given worker.
Source§fn middleware(&self) -> Self::Layer
fn middleware(&self) -> Self::Layer
Returns the backend’s middleware layer.
Source§fn poll(self, _worker: &WorkerContext) -> Self::Stream
fn poll(self, _worker: &WorkerContext) -> Self::Stream
Polls the backend for tasks for the given worker.
Source§impl<Args> Clone for JsonStorage<Args>
impl<Args> Clone for JsonStorage<Args>
Source§impl<Args: Debug> Debug for JsonStorage<Args>
impl<Args: Debug> Debug for JsonStorage<Args>
Source§impl<Args: Unpin + Serialize> Sink<Task<Args, Map<String, Value>>> for JsonStorage<Args>
impl<Args: Unpin + Serialize> Sink<Task<Args, Map<String, Value>>> for JsonStorage<Args>
Source§fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>
fn poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>
Attempts to prepare the
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut Self>,
item: Task<Args, Map<String, Value>>,
) -> Result<(), Self::Error>
fn start_send( self: Pin<&mut Self>, item: Task<Args, Map<String, Value>>, ) -> Result<(), Self::Error>
Begin the process of sending a value to the sink.
Each call to this function must be preceded by a successful call to
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args>
impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args>
Source§impl<Res: 'static + DeserializeOwned + Send, Compact> WaitForCompletion<Res, Compact> for JsonStorage<Compact>
impl<Res: 'static + DeserializeOwned + Send, Compact> WaitForCompletion<Res, Compact> for JsonStorage<Compact>
Source§type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, SendError>> + Send>>
type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, SendError>> + Send>>
The result stream type yielding task results
Source§fn wait_for(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
) -> Self::ResultStream
fn wait_for( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>, ) -> Self::ResultStream
Wait for multiple tasks to complete, yielding results as they become available
Source§async fn check_status(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
) -> Result<Vec<TaskResult<Res>>, Self::Error>
async fn check_status( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send, ) -> Result<Vec<TaskResult<Res>>, Self::Error>
Check current status of tasks without waiting
Source§fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
Wait for a single task to complete, yielding its result
Auto Trait Implementations§
impl<Args> Freeze for JsonStorage<Args>
impl<Args> !RefUnwindSafe for JsonStorage<Args>
impl<Args> Send for JsonStorage<Args>where
Args: Send,
impl<Args> Sync for JsonStorage<Args>where
Args: Sync,
impl<Args> Unpin for JsonStorage<Args>where
Args: Unpin,
impl<Args> !UnwindSafe for JsonStorage<Args>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T, Item> SinkExt<Item> for T
impl<T, Item> SinkExt<Item> for T
Source§fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
Composes a function in front of the sink. Read more
Source§fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
Composes a function in front of the sink. Read more
Source§fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
Transforms the error returned by the sink.
Source§fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
Map this sink’s error to a different error type using the
Into trait. Read moreSource§fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
Adds a fixed-size buffer to the current sink. Read more
Source§fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
Flush the sink, processing all pending items. Read more
Source§fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been fully processed
into the sink, including flushing. Read more
Source§fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been received
by the sink. Read more
Source§fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
A future that completes after the given stream has been fully processed
into the sink, including flushing. Read more
Source§fn right_sink<Si1>(self) -> Either<Si1, Self>
fn right_sink<Si1>(self) -> Either<Si1, Self>
Source§fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
A convenience method for calling
Sink::poll_ready on Unpin
sink types.Source§fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
A convenience method for calling
Sink::start_send on Unpin
sink types.Source§impl<T> StreamExt for T
impl<T> StreamExt for T
Source§fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
Creates a future that resolves to the next item in the stream. Read more
Source§fn into_future(self) -> StreamFuture<Self>
fn into_future(self) -> StreamFuture<Self>
Source§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
Maps this stream’s items to a different type, returning a new stream of
the resulting type. Read more
Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
Creates a stream which gives the current iteration count as well as
the next value. Read more
Source§fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
Filters the values produced by this stream according to the provided
asynchronous predicate. Read more
Source§fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
Filters the values produced by this stream while simultaneously mapping
them to a different type according to the provided asynchronous closure. Read more
Source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
Computes from this stream’s items new items of a different type using
an asynchronous closure. Read more
Source§fn collect<C>(self) -> Collect<Self, C>
fn collect<C>(self) -> Collect<Self, C>
Transforms a stream into a collection, returning a
future representing the result of that computation. Read more
Source§fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
Converts a stream of pairs into a future, which
resolves to pair of containers. Read more
Source§fn concat(self) -> Concat<Self>
fn concat(self) -> Concat<Self>
Concatenate all items of a stream into a single extendable
destination, returning a future representing the end result. Read more
Source§fn count(self) -> Count<Self>where
Self: Sized,
fn count(self) -> Count<Self>where
Self: Sized,
Drives the stream to completion, counting the number of items. Read more
Source§fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
Execute an accumulating asynchronous computation over a stream,
collecting all the values into one final result. Read more
Source§fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
Execute predicate over asynchronous stream, and return
true if any element in stream satisfied a predicate. Read moreSource§fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
Execute predicate over asynchronous stream, and return
true if all element in stream satisfied a predicate. Read moreSource§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Flattens a stream of streams into just one continuous stream. Read more
Source§fn flatten_unordered(
self,
limit: impl Into<Option<usize>>,
) -> FlattenUnorderedWithFlowController<Self, ()>
fn flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> FlattenUnorderedWithFlowController<Self, ()>
Flattens a stream of streams into just one continuous stream. Polls
inner streams produced by the base stream concurrently. Read more
Source§fn flat_map_unordered<U, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> FlatMapUnordered<Self, U, F>
fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F>
Maps a stream like
StreamExt::map but flattens nested Streams
and polls them concurrently, yielding items in any order, as they made
available. Read moreSource§fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
Combinator similar to
StreamExt::fold that holds internal state
and produces a new stream. Read moreSource§fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
Skip elements on this stream while the provided asynchronous predicate
resolves to
true. Read moreSource§fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
Take elements from this stream while the provided asynchronous predicate
resolves to
true. Read moreSource§fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
Take elements from this stream until the provided future resolves. Read more
Source§fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
Runs this stream to completion, executing the provided asynchronous
closure for each element on the stream. Read more
Source§fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> ForEachConcurrent<Self, Fut, F>
fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F>
Runs this stream to completion, executing the provided asynchronous
closure for each element on the stream concurrently as elements become
available. Read more
Source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
Creates a new stream of at most
n items of the underlying stream. Read moreSource§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
Creates a new stream which skips
n items of the underlying stream. Read moreSource§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
Catches unwinding panics while polling the stream. Read more
Source§fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
Wrap the stream in a Box, pinning it. Read more
Source§fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
Wrap the stream in a Box, pinning it. Read more
Source§fn buffered(self, n: usize) -> Buffered<Self>
fn buffered(self, n: usize) -> Buffered<Self>
An adaptor for creating a buffered list of pending futures. Read more
Source§fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
An adaptor for creating a buffered list of pending futures (unordered). Read more
Source§fn zip<St>(self, other: St) -> Zip<Self, St>
fn zip<St>(self, other: St) -> Zip<Self, St>
An adapter for zipping two streams together. Read more
Source§fn peekable(self) -> Peekable<Self>where
Self: Sized,
fn peekable(self) -> Peekable<Self>where
Self: Sized,
Creates a new stream which exposes a
peek method. Read moreSource§fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
An adaptor for chunking up items of the stream inside a vector. Read more
Source§fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
An adaptor for chunking up ready items of the stream inside a vector. Read more
Source§fn forward<S>(self, sink: S) -> Forward<Self, S>
fn forward<S>(self, sink: S) -> Forward<Self, S>
A future that completes after the given stream has been fully processed
into the sink and the sink has been flushed and closed. Read more
Source§fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
Source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
Do something with each item of this stream, afterwards passing it on. Read more
Source§fn left_stream<B>(self) -> Either<Self, B>
fn left_stream<B>(self) -> Either<Self, B>
Source§fn right_stream<B>(self) -> Either<B, Self>
fn right_stream<B>(self) -> Either<B, Self>
Source§fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
A convenience method for calling
Stream::poll_next on Unpin
stream types.