pub struct JsonStorage<Args> { /* private fields */ }Available on crate feature
json only.Expand description
A backend that persists to a file using json encoding
Warning: This backend is not optimized for high-throughput scenarios and is best suited for development, testing, or low-volume workloads.
§Example
Creates a temporary JSON storage backend
let mut backend = JsonStorage::new_temp().unwrap();§Features
| Feature | Status | Description |
|---|---|---|
Backend | ✅ | Basic Backend functionality |
TaskSink | ✅ | Ability to push new tasks |
Serialization | ✅ ❗ | Serialization support for arguments. Only accepts json |
Web Interface | ⚠️ | Expose a web interface for monitoring tasks |
FetchById | ⚠️ | Allow fetching a task by its ID |
RegisterWorker | ❌ | Allow registering a worker with the backend |
PipeExt | ✅ | Allow other backends to pipe to this backend |
MakeShared | ✅ | Share the same JSON storage across multiple workers via SharedJsonStore |
Workflow | ✅ | Flexible enough to support workflows |
WaitForCompletion | ✅ | Wait for tasks to complete without blocking |
ResumeById | ⚠️ | Resume a task by its ID |
ResumeAbandoned | ⚠️ | Resume abandoned tasks |
ListWorkers | ❌ | List all workers registered with the backend |
ListTasks | ⚠️ | List all tasks in the backend |
Key: ✅ : Supported | ⚠️ : Not implemented | ❌ : Not Supported | ❗ Limited support
Tests:
§Backend
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
async fn task(task: u32, worker: WorkerContext) {
// Do some work
}
let worker = WorkerBuilder::new("backend-test")
.backend(backend)
.build(task);
let _ = worker.stream().take(1).collect::<Vec<_>>().await;
}
§TaskSink
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
backend.push(42).await.unwrap();
async fn task(task: u32, worker: WorkerContext) {
worker.stop().unwrap();
}
let worker = WorkerBuilder::new("task-sink-test")
.backend(backend)
.build(task);
worker.run().await.unwrap();
}
§Workflow
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
backend.push_start(42).await.unwrap();
async fn task1(task: u32, worker: WorkerContext) -> u32 {
task + 99
}
async fn task2(task: u32, worker: WorkerContext) -> u32 {
task + 1
}
async fn task3(task: u32, worker: WorkerContext) {
assert_eq!(task, 142);
worker.stop().unwrap();
}
let workflow = Workflow::new("test-workflow")
.and_then(task1)
.and_then(task2)
.and_then(task3);
let worker = WorkerBuilder::new("workflow-test")
.backend(backend)
.build(workflow);
worker.run().await.unwrap();
}
§WaitForCompletion
#[tokio::main]
async fn main() {
// let mut backend = /* snip */;
fn assert_wait_for_completion<B: WaitForCompletion<(), Args = u32>>(backend: B) {};
assert_wait_for_completion(backend);
}
Implementations§
Source§impl<Args> JsonStorage<Args>
impl<Args> JsonStorage<Args>
Sourcepub fn new(path: impl Into<PathBuf>) -> Result<Self>
pub fn new(path: impl Into<PathBuf>) -> Result<Self>
Creates a new JsonStorage instance using the specified file path.
Sourcepub fn remove(&mut self, key: &TaskKey) -> Result<Option<TaskWithMeta>>
pub fn remove(&mut self, key: &TaskKey) -> Result<Option<TaskWithMeta>>
Removes a task from the storage.
Sourcepub fn reload(&mut self) -> Result<()>
pub fn reload(&mut self) -> Result<()>
Reload data from disk, useful if the file was modified externally
Sourcepub fn update_status(
&mut self,
old_key: &TaskKey,
new_status: Status,
) -> Result<bool>
pub fn update_status( &mut self, old_key: &TaskKey, new_status: Status, ) -> Result<bool>
Update the status of an existing key
Trait Implementations§
Source§impl<Args> Backend for JsonStorage<Args>
impl<Args> Backend for JsonStorage<Args>
Source§type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, Map<String, Value>>>, SendError>> + Send>>
type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, Map<String, Value>>>, SendError>> + Send>>
A stream of tasks provided by the backend.
Source§type Layer = AcknowledgeLayer<JsonAck<Args>>
type Layer = AcknowledgeLayer<JsonAck<Args>>
The type representing backend middleware layer.
Source§type Beat = Pin<Box<dyn Stream<Item = Result<(), <JsonStorage<Args> as Backend>::Error>> + Send>>
type Beat = Pin<Box<dyn Stream<Item = Result<(), <JsonStorage<Args> as Backend>::Error>> + Send>>
A stream representing heartbeat signals.
Source§fn heartbeat(&self, _: &WorkerContext) -> Self::Beat
fn heartbeat(&self, _: &WorkerContext) -> Self::Beat
Returns a heartbeat stream for the given worker.
Source§fn middleware(&self) -> Self::Layer
fn middleware(&self) -> Self::Layer
Returns the backend’s middleware layer.
Source§fn poll(self, _worker: &WorkerContext) -> Self::Stream
fn poll(self, _worker: &WorkerContext) -> Self::Stream
Polls the backend for tasks for the given worker.
Source§impl<Args: 'static + Send + Serialize + DeserializeOwned + Unpin> BackendExt for JsonStorage<Args>
impl<Args: 'static + Send + Serialize + DeserializeOwned + Unpin> BackendExt for JsonStorage<Args>
Source§type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<<JsonStorage<Args> as BackendExt>::Compact, Map<String, Value>>>, SendError>> + Send>>
type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<<JsonStorage<Args> as BackendExt>::Compact, Map<String, Value>>>, SendError>> + Send>>
A stream of encoded tasks provided by the backend.
Source§fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
Polls the backend for encoded tasks for the given worker.
Source§impl<Args> Clone for JsonStorage<Args>
impl<Args> Clone for JsonStorage<Args>
Source§impl<Args> ConfigExt for JsonStorage<Args>
impl<Args> ConfigExt for JsonStorage<Args>
Source§impl<Args: Debug> Debug for JsonStorage<Args>
impl<Args: Debug> Debug for JsonStorage<Args>
Source§impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, Map<String, Value>>> for JsonStorage<Args>
impl<Args: Unpin + Serialize + DeserializeOwned> Sink<Task<Value, Map<String, Value>>> for JsonStorage<Args>
Source§fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>
fn poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>
Attempts to prepare the
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut Self>,
item: Task<Value, Map<String, Value>>,
) -> Result<(), Self::Error>
fn start_send( self: Pin<&mut Self>, item: Task<Value, Map<String, Value>>, ) -> Result<(), Self::Error>
Begin the process of sending a value to the sink.
Each call to this function must be preceded by a successful call to
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args>
impl<Args: DeserializeOwned + Unpin> Stream for JsonStorage<Args>
Source§impl<Res: 'static + DeserializeOwned + Send, Args> WaitForCompletion<Res> for JsonStorage<Args>
Available on crate feature sleep only.
impl<Res: 'static + DeserializeOwned + Send, Args> WaitForCompletion<Res> for JsonStorage<Args>
Available on crate feature
sleep only.Source§type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, SendError>> + Send>>
type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<Res>, SendError>> + Send>>
The result stream type yielding task results
Source§fn wait_for(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
) -> Self::ResultStream
fn wait_for( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>, ) -> Self::ResultStream
Wait for multiple tasks to complete, yielding results as they become available
Source§async fn check_status(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
) -> Result<Vec<TaskResult<Res>>, Self::Error>
async fn check_status( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send, ) -> Result<Vec<TaskResult<Res>>, Self::Error>
Check current status of tasks without waiting
Source§fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
Wait for a single task to complete, yielding its result
Auto Trait Implementations§
impl<Args> Freeze for JsonStorage<Args>
impl<Args> !RefUnwindSafe for JsonStorage<Args>
impl<Args> Send for JsonStorage<Args>where
Args: Send,
impl<Args> Sync for JsonStorage<Args>where
Args: Sync,
impl<Args> Unpin for JsonStorage<Args>where
Args: Unpin,
impl<Args> !UnwindSafe for JsonStorage<Args>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T, Item> SinkExt<Item> for T
impl<T, Item> SinkExt<Item> for T
Source§fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
Composes a function in front of the sink. Read more
Source§fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
Composes a function in front of the sink. Read more
Source§fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
Transforms the error returned by the sink.
Source§fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
Map this sink’s error to a different error type using the
Into trait. Read moreSource§fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
Available on crate feature
alloc only.Adds a fixed-size buffer to the current sink. Read more
Source§fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
Flush the sink, processing all pending items. Read more
Source§fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been fully processed
into the sink, including flushing. Read more
Source§fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
A future that completes after the given item has been received
by the sink. Read more
Source§fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
A future that completes after the given stream has been fully processed
into the sink, including flushing. Read more
Source§fn right_sink<Si1>(self) -> Either<Si1, Self>
fn right_sink<Si1>(self) -> Either<Si1, Self>
Source§fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
A convenience method for calling
Sink::poll_ready on Unpin
sink types.Source§fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
A convenience method for calling
Sink::start_send on Unpin
sink types.Source§impl<T> StreamExt for T
impl<T> StreamExt for T
Source§fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
Creates a future that resolves to the next item in the stream. Read more
Source§fn into_future(self) -> StreamFuture<Self>
fn into_future(self) -> StreamFuture<Self>
Source§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
Maps this stream’s items to a different type, returning a new stream of
the resulting type. Read more
Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
Creates a stream which gives the current iteration count as well as
the next value. Read more
Source§fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
Filters the values produced by this stream according to the provided
asynchronous predicate. Read more
Source§fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
Filters the values produced by this stream while simultaneously mapping
them to a different type according to the provided asynchronous closure. Read more
Source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
Computes from this stream’s items new items of a different type using
an asynchronous closure. Read more
Source§fn collect<C>(self) -> Collect<Self, C>
fn collect<C>(self) -> Collect<Self, C>
Transforms a stream into a collection, returning a
future representing the result of that computation. Read more
Source§fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
Converts a stream of pairs into a future, which
resolves to pair of containers. Read more
Source§fn concat(self) -> Concat<Self>
fn concat(self) -> Concat<Self>
Concatenate all items of a stream into a single extendable
destination, returning a future representing the end result. Read more
Source§fn count(self) -> Count<Self>where
Self: Sized,
fn count(self) -> Count<Self>where
Self: Sized,
Drives the stream to completion, counting the number of items. Read more
Source§fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
Execute an accumulating asynchronous computation over a stream,
collecting all the values into one final result. Read more
Source§fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
Execute predicate over asynchronous stream, and return
true if any element in stream satisfied a predicate. Read moreSource§fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
Execute predicate over asynchronous stream, and return
true if all element in stream satisfied a predicate. Read moreSource§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Flattens a stream of streams into just one continuous stream. Read more
Source§fn flatten_unordered(
self,
limit: impl Into<Option<usize>>,
) -> FlattenUnorderedWithFlowController<Self, ()>
fn flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> FlattenUnorderedWithFlowController<Self, ()>
Available on crate feature
alloc only.Flattens a stream of streams into just one continuous stream. Polls
inner streams produced by the base stream concurrently. Read more
Source§fn flat_map_unordered<U, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> FlatMapUnordered<Self, U, F>
fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F>
Available on crate feature
alloc only.Maps a stream like
StreamExt::map but flattens nested Streams
and polls them concurrently, yielding items in any order, as they made
available. Read moreSource§fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
Combinator similar to
StreamExt::fold that holds internal state
and produces a new stream. Read moreSource§fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
Skip elements on this stream while the provided asynchronous predicate
resolves to
true. Read moreSource§fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
Take elements from this stream while the provided asynchronous predicate
resolves to
true. Read moreSource§fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
Take elements from this stream until the provided future resolves. Read more
Source§fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
Runs this stream to completion, executing the provided asynchronous
closure for each element on the stream. Read more
Source§fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> ForEachConcurrent<Self, Fut, F>
fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F>
Available on crate feature
alloc only.Runs this stream to completion, executing the provided asynchronous
closure for each element on the stream concurrently as elements become
available. Read more
Source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
Creates a new stream of at most
n items of the underlying stream. Read moreSource§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
Creates a new stream which skips
n items of the underlying stream. Read moreSource§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
Available on crate feature
std only.Catches unwinding panics while polling the stream. Read more
Source§fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
Available on crate feature
alloc only.Wrap the stream in a Box, pinning it. Read more
Source§fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
Available on crate feature
alloc only.Wrap the stream in a Box, pinning it. Read more
Source§fn buffered(self, n: usize) -> Buffered<Self>
fn buffered(self, n: usize) -> Buffered<Self>
Available on crate feature
alloc only.An adaptor for creating a buffered list of pending futures. Read more
Source§fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
Available on crate feature
alloc only.An adaptor for creating a buffered list of pending futures (unordered). Read more
Source§fn zip<St>(self, other: St) -> Zip<Self, St>
fn zip<St>(self, other: St) -> Zip<Self, St>
An adapter for zipping two streams together. Read more
Source§fn peekable(self) -> Peekable<Self>where
Self: Sized,
fn peekable(self) -> Peekable<Self>where
Self: Sized,
Creates a new stream which exposes a
peek method. Read moreSource§fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
Available on crate feature
alloc only.An adaptor for chunking up items of the stream inside a vector. Read more
Source§fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
Available on crate feature
alloc only.An adaptor for chunking up ready items of the stream inside a vector. Read more
Source§fn forward<S>(self, sink: S) -> Forward<Self, S>
fn forward<S>(self, sink: S) -> Forward<Self, S>
Available on crate feature
sink only.A future that completes after the given stream has been fully processed
into the sink and the sink has been flushed and closed. Read more
Source§fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
Available on crate features
sink and alloc only.Source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
Do something with each item of this stream, afterwards passing it on. Read more
Source§fn left_stream<B>(self) -> Either<Self, B>
fn left_stream<B>(self) -> Either<Self, B>
Source§fn right_stream<B>(self) -> Either<B, Self>
fn right_stream<B>(self) -> Either<B, Self>
Source§fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
A convenience method for calling
Stream::poll_next on Unpin
stream types.