pub struct PostgresStorage<Args, Codec = JsonCodec<CompactType>, Fetcher = PgFetcher<CompactType, Codec>> { /* private fields */ }Expand description
PostgreSQL storage backend implemented with Diesel.
Implementations§
Source§impl<Args> PostgresStorage<Args>
impl<Args> PostgresStorage<Args>
Sourcepub fn new(pool: &PgPool) -> Self
pub fn new(pool: &PgPool) -> Self
Create storage for the queue named after Args.
Do not share pool with HTTP request handlers or other unrelated
workloads. Apalis holds long-lived connections (fetcher, lifecycle
keep-alive, listener) and a backend that exhausts a shared pool will
stall the worker, causing heartbeat loss and orphan reenqueue
cascades. See the README section “Connection pool isolation” for the
recommended sizing and the Self::push_with_conn outbox API for the
supported way to enqueue from a backend transaction.
Sourcepub fn new_with_config(pool: &PgPool, config: &Config) -> Self
pub fn new_with_config(pool: &PgPool, config: &Config) -> Self
Create storage with an explicit Apalis SQL config.
Do not share pool with HTTP request handlers or other unrelated
workloads — see Self::new for the rationale.
Sourcepub fn new_with_notify(
pool: &PgPool,
config: &Config,
) -> PostgresStorage<Args, JsonCodec<CompactType>, PgNotify>
pub fn new_with_notify( pool: &PgPool, config: &Config, ) -> PostgresStorage<Args, JsonCodec<CompactType>, PgNotify>
Create storage that also listens for PostgreSQL notifications.
Notify mode uses a dedicated pooled connection for LISTEN "apalis::job::insert" while the polling stream is alive. Each
new_with_notify storage spawns one listener thread and pins one
pool connection. If you need notify-driven dequeue across many
queues, prefer crate::SharedPostgresStorage — it spawns a single
listener thread shared by all queues registered with it, so the
thread/connection cost stays at one regardless of queue count.
Do not share pool with HTTP request handlers or other unrelated
workloads — see Self::new for the rationale.
Source§impl<Args, Codec, Fetcher> PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher> PostgresStorage<Args, Codec, Fetcher>
Sourcepub fn with_codec<NewCodec>(self) -> PostgresStorage<Args, NewCodec, Fetcher>
pub fn with_codec<NewCodec>(self) -> PostgresStorage<Args, NewCodec, Fetcher>
Change the task codec while retaining pool, config, and fetcher.
Source§impl<Args, EncodeCodec, Fetcher> PostgresStorage<Args, EncodeCodec, Fetcher>
Transactional enqueue on a caller-supplied connection — the outbox
entry point. Use these methods when you want the task INSERT to share a
transaction with your business-data writes.
impl<Args, EncodeCodec, Fetcher> PostgresStorage<Args, EncodeCodec, Fetcher>
Transactional enqueue on a caller-supplied connection — the outbox entry point. Use these methods when you want the task INSERT to share a transaction with your business-data writes.
Sourcepub fn push_with_conn(
&self,
conn: &mut PgConnection,
args: Args,
) -> Result<PgTaskId, Error>
pub fn push_with_conn( &self, conn: &mut PgConnection, args: Args, ) -> Result<PgTaskId, Error>
Enqueue a task using a caller-supplied PgConnection.
For transactional outbox semantics, call this inside
conn.transaction(|c| ...) together with your business-data writes —
the INSERT into apalis.jobs is committed only if the outer
transaction commits. Without an outer transaction, Diesel auto-commits
the INSERT (same behaviour as the pool-based Sink<Task> path).
NOTIFY is delivered when the (outer) transaction commits, so listeners
only see tasks that were actually committed. No manual pg_notify is
needed.
This is the only synchronous public method in the crate. From an async
context, invoke it inside tokio::task::spawn_blocking together with
your business-data writes so the entire transaction lives on one
blocking task.
See Self::push_task_with_conn for the full-control variant that
accepts a pre-built PgTask (custom idempotency_key, priority,
run_at, max_attempts, metadata, or task_id).
§Errors
Error::Decodeif the codec rejectsargs.Error::InvalidArgumentif serialized metadata exceeds the byte cap, or for unreachablerun_at.Error::Databasefor SQL/driver failures.
Sourcepub fn push_task_with_conn(
&self,
conn: &mut PgConnection,
task: PgTask<Args>,
) -> Result<PgTaskId, Error>
pub fn push_task_with_conn( &self, conn: &mut PgConnection, task: PgTask<Args>, ) -> Result<PgTaskId, Error>
Enqueue a fully-constructed PgTask<Args> using a caller-supplied
connection. Use this when you need to set idempotency_key,
priority, run_at, max_attempts, metadata, or a specific
task_id.
Semantics are identical to Self::push_with_conn; see that method’s
docs for the transaction/NOTIFY contract.
If task.parts.task_id is None, a fresh Ulid is generated and
returned. If Some, that id is used as-is and echoed back.
§Errors
Error::Decodeif the codec rejects the task’sargs.Error::IdempotencyConflictonidempotency_keyconflict — the savepoint for this batch is rolled back (the whole batch, not just the duplicate), but your outer transaction continues; decide whether to commit or roll back based on the error.Error::InvalidArgumentif serialized metadata exceeds the byte cap.Error::Databasefor SQL/driver failures.
Trait Implementations§
Source§impl<Args, Decode, Fetcher> Backend for PostgresStorage<Args, Decode, Fetcher>
Single generic Backend impl covering every Fetcher: PgFetcherSource.
Heartbeat/middleware are identical for all three modes; the per-mode
pipeline is delegated through PgFetcherSource::into_compact_stream.
impl<Args, Decode, Fetcher> Backend for PostgresStorage<Args, Decode, Fetcher>
Single generic Backend impl covering every Fetcher: PgFetcherSource.
Heartbeat/middleware are identical for all three modes; the per-mode
pipeline is delegated through PgFetcherSource::into_compact_stream.
Source§type Context = SqlContext<Pool<ConnectionManager<PgConnection>>>
type Context = SqlContext<Pool<ConnectionManager<PgConnection>>>
Source§type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>>, Error>> + Send>>
type Stream = Pin<Box<dyn Stream<Item = Result<Option<Task<Args, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>>, Error>> + Send>>
Source§type Beat = Pin<Box<dyn Stream<Item = Result<(), Error>> + Send>>
type Beat = Pin<Box<dyn Stream<Item = Result<(), Error>> + Send>>
Source§type Layer = PgMiddleware
type Layer = PgMiddleware
Source§fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat
fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat
Source§fn middleware(&self) -> Self::Layer
fn middleware(&self) -> Self::Layer
Source§fn poll(self, worker: &WorkerContext) -> Self::Stream
fn poll(self, worker: &WorkerContext) -> Self::Stream
Source§impl<Args, Decode, Fetcher> BackendExt for PostgresStorage<Args, Decode, Fetcher>
impl<Args, Decode, Fetcher> BackendExt for PostgresStorage<Args, Decode, Fetcher>
Source§type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<Vec<u8>, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>>, <PostgresStorage<Args, Decode, Fetcher> as Backend>::Error>> + Send>>
type CompactStream = Pin<Box<dyn Stream<Item = Result<Option<Task<Vec<u8>, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>>, <PostgresStorage<Args, Decode, Fetcher> as Backend>::Error>> + Send>>
Source§fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream
Source§impl<Args, Codec, Fetcher: Clone> Clone for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher: Clone> Clone for PostgresStorage<Args, Codec, Fetcher>
Source§impl<Args, Codec, Fetcher: Debug> Debug for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher: Debug> Debug for PostgresStorage<Args, Codec, Fetcher>
Source§impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
D: Codec<Args, Compact = CompactType>,
D::Error: Error + Send + Sync + 'static,
Args: 'static,
impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
D: Codec<Args, Compact = CompactType>,
D::Error: Error + Send + Sync + 'static,
Args: 'static,
Source§impl<Args, D, F> ListAllTasks for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
impl<Args, D, F> ListAllTasks for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Source§impl<Args, D, F> ListQueues for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
impl<Args, D, F> ListQueues for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Source§impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
D: Codec<Args, Compact = CompactType>,
D::Error: Error + Send + Sync + 'static,
Args: 'static,
impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
D: Codec<Args, Compact = CompactType>,
D::Error: Error + Send + Sync + 'static,
Args: 'static,
Source§impl<Args, D, F> ListWorkers for PostgresStorage<Args, D, F>where
Args: Sync,
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
impl<Args, D, F> ListWorkers for PostgresStorage<Args, D, F>where
Args: Sync,
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Source§fn list_workers(
&self,
) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
fn list_workers( &self, ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
Source§fn list_all_workers(
&self,
) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
fn list_all_workers( &self, ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send
Source§impl<Args, D, F> Metrics for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
impl<Args, D, F> Metrics for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Source§fn global(
&self,
) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send
fn global( &self, ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send
Scans apalis.jobs to compute global statistics. Each call evaluates
20+ FILTER aggregates over every row; cost grows linearly with the
table size. Treat as a slow admin call.
Source§fn fetch_by_queue(
&self,
) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send
fn fetch_by_queue( &self, ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send
Same shape as Self::global, scoped to the configured queue. Cost
still depends on the number of jobs in that job_type.
Source§impl<Args, D, F> RegisterWorker for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
impl<Args, D, F> RegisterWorker for PostgresStorage<Args, D, F>where
PostgresStorage<Args, D, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Source§impl<Args, Encode, Fetcher> Sink<Task<Vec<u8>, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>> for PostgresStorage<Args, Encode, Fetcher>
impl<Args, Encode, Fetcher> Sink<Task<Vec<u8>, SqlContext<Pool<ConnectionManager<PgConnection>>>, Ulid>> for PostgresStorage<Args, Encode, Fetcher>
Source§fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>
fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>
Sink to receive a value. Read moreSource§fn start_send(
self: Pin<&mut Self>,
item: PgTask<CompactType>,
) -> Result<(), Self::Error>
fn start_send( self: Pin<&mut Self>, item: PgTask<CompactType>, ) -> Result<(), Self::Error>
poll_ready which returned Poll::Ready(Ok(())). Read moreSource§impl<O, Args, F, Decode> WaitForCompletion<O> for PostgresStorage<Args, Decode, F>where
O: 'static + Send,
PostgresStorage<Args, Decode, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Result<O, String>: DeserializeOwned,
impl<O, Args, F, Decode> WaitForCompletion<O> for PostgresStorage<Args, Decode, F>where
O: 'static + Send,
PostgresStorage<Args, Decode, F>: BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = Error>,
Result<O, String>: DeserializeOwned,
Source§fn wait_for(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
) -> Self::ResultStream
fn wait_for( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>, ) -> Self::ResultStream
Wait for the given tasks to complete, yielding each result as it lands.
§Error handling
A transient database error during polling does not abandon the
batch: the poll is retried with backoff. The stream yields an Err and
ends only once the failures persist across several consecutive polls.
Because completed results are durable in apalis.jobs, a surfaced error
is always safe to recover from by calling wait_for again with the ids
that have not yet yielded a result.
Source§type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<O, Ulid>, Error>> + Send>>
type ResultStream = Pin<Box<dyn Stream<Item = Result<TaskResult<O, Ulid>, Error>> + Send>>
Source§fn check_status(
&self,
task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
) -> impl Future<Output = Result<Vec<TaskResult<O, Ulid>>, Self::Error>> + Send
fn check_status( &self, task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send, ) -> impl Future<Output = Result<Vec<TaskResult<O, Ulid>>, Self::Error>> + Send
Source§fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream
impl<Args, Codec, Fetcher: Unpin> Unpin for PostgresStorage<Args, Codec, Fetcher>
Auto Trait Implementations§
impl<Args, Codec = JsonCodec<Vec<u8>>, Fetcher = PgFetcher<Vec<u8>, Codec>> !Freeze for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec = JsonCodec<Vec<u8>>, Fetcher = PgFetcher<Vec<u8>, Codec>> !RefUnwindSafe for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher> Send for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher> Sync for PostgresStorage<Args, Codec, Fetcher>
impl<Args, Codec, Fetcher> UnsafeUnpin for PostgresStorage<Args, Codec, Fetcher>where
Fetcher: UnsafeUnpin,
impl<Args, Codec = JsonCodec<Vec<u8>>, Fetcher = PgFetcher<Vec<u8>, Codec>> !UnwindSafe for PostgresStorage<Args, Codec, Fetcher>
Blanket Implementations§
Source§impl<T> AggregateExpressionMethods for T
impl<T> AggregateExpressionMethods for T
Source§fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
DISTINCT modifier for aggregate functions Read moreSource§fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
ALL modifier for aggregate functions Read moreSource§fn aggregate_filter<P>(self, f: P) -> Self::Output
fn aggregate_filter<P>(self, f: P) -> Self::Output
Source§fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
Source§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.Source§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.Source§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.Source§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.Source§impl<T> DowncastSend for T
impl<T> DowncastSend for T
Source§impl<T> DowncastSync for T
impl<T> DowncastSync for T
Source§impl<T> IntoSql for T
impl<T> IntoSql for T
Source§fn into_sql<T>(self) -> Self::Expression
fn into_sql<T>(self) -> Self::Expression
self to an expression for Diesel’s query builder. Read moreSource§fn as_sql<'a, T>(&'a self) -> <&'a Self as AsExpression<T>>::Expression
fn as_sql<'a, T>(&'a self) -> <&'a Self as AsExpression<T>>::Expression
&self to an expression for Diesel’s query builder. Read moreSource§impl<T, Item> SinkExt<Item> for T
impl<T, Item> SinkExt<Item> for T
Source§fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
Source§fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
Source§fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
Source§fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
Into trait. Read moreSource§fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
Source§fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
Source§fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
Source§fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
Source§fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
Source§fn right_sink<Si1>(self) -> Either<Si1, Self>
fn right_sink<Si1>(self) -> Either<Si1, Self>
Source§fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
Sink::poll_ready on Unpin
sink types.Source§fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
Sink::start_send on Unpin
sink types.