pub struct OutboxQueue { /* private fields */ }Expand description
Queue handle. Cheap to clone; all state is either on disk or inside
an Arc<Atomic*> so multiple callers inside the same process observe
the same breaker state.
Implementations§
Source§impl OutboxQueue
impl OutboxQueue
Sourcepub fn new(pool: SqlitePool) -> Self
pub fn new(pool: SqlitePool) -> Self
Build a queue handle from an existing pool. The pool must have the
cloud_outbox migration applied (i.e. init_db was called on it).
Sourcepub async fn enqueue(
&self,
kind: &str,
payload_json: &str,
) -> Result<i64, Error>
pub async fn enqueue( &self, kind: &str, payload_json: &str, ) -> Result<i64, Error>
Insert a new fire-and-forget payload. Returns the row id so tests (and the occasional curious caller) can trace a specific item. Production callers usually ignore the returned id.
Sourcepub fn circuit_state(&self) -> CircuitState
pub fn circuit_state(&self) -> CircuitState
Current breaker state. None rows + closed breaker ⇒ callers may
proceed. Open ⇒ short-circuit. Callers should check this before
building expensive payloads for bulk drains.
Sourcepub async fn claim_next(&self) -> Result<Option<OutboxItem>, Error>
pub async fn claim_next(&self) -> Result<Option<OutboxItem>, Error>
Atomically pick the oldest pending row and flip it to
processing. Returns None when the queue is empty or when the
circuit breaker is open.
The UPDATE uses RETURNING so claim-and-read happen in one
statement (SQLite serialises writes per connection, so this is
equivalent to SELECT … FOR UPDATE on a row-at-a-time queue).
pub async fn claim_next_kind( &self, kind: &str, ) -> Result<Option<OutboxItem>, Error>
Sourcepub async fn confirm(&self, id: i64) -> Result<(), Error>
pub async fn confirm(&self, id: i64) -> Result<(), Error>
Upload succeeded — delete the row and reset the consecutive- failure counter so the circuit can close again.
Sourcepub async fn mark_failed(&self, id: i64, err: &str) -> Result<(), Error>
pub async fn mark_failed(&self, id: i64, err: &str) -> Result<(), Error>
Upload failed. If the row has been tried fewer than
MAX_RETRY_COUNT times, bounce it back to pending so a later
drain can retry. Otherwise flip it to abandoned — we keep the
row for diagnostics but will never re-claim it.
This also ticks the consecutive-failure counter and, on the
threshold, opens the circuit for CIRCUIT_OPEN_DURATION_MS.
Sourcepub async fn reset_stale(&self, threshold_secs: u64) -> Result<u64, Error>
pub async fn reset_stale(&self, threshold_secs: u64) -> Result<u64, Error>
Promote any processing rows older than threshold_secs back to
pending. Called at startup (see startup.rs) to recover from
crashed drains.
Sourcepub async fn pending_counts_by_kind(&self) -> Result<Vec<(String, i64)>, Error>
pub async fn pending_counts_by_kind(&self) -> Result<Vec<(String, i64)>, Error>
Per-kind breakdown of status='pending' rows for lag warnings.
Sorted by kind for deterministic rendering. Uses runtime SQL so
this diagnostic helper does not require .sqlx cache updates.
Sourcepub async fn drain_abandoned_older_than(
&self,
cutoff_unix_ms: i64,
dry_run: bool,
) -> Result<DrainSummary, Error>
pub async fn drain_abandoned_older_than( &self, cutoff_unix_ms: i64, dry_run: bool, ) -> Result<DrainSummary, Error>
Drain stale abandoned rows older than cutoff_unix_ms back to
the live queue. Returns a per-kind breakdown of rows that were
(or would be, in dry_run mode) reset.
Cutoff semantics: a row is eligible iff its most recent
claimed_at (last attempt) — or created_at if it was
abandoned before any attempt — is older than
cutoff_unix_ms. This deliberately leaves recently-abandoned
rows alone: those are almost certainly the symptom of a current
outage rather than a stale-auth backlog.
Tx-safety: the whole operation runs inside a single
BEGIN/COMMIT so a partial drain cannot leave the queue in a
half-reset state. dry_run = true still runs the SELECT but
rolls back instead of committing, so it has the same
transactional snapshot guarantees as the real path while
remaining strictly read-only.
Sourcepub async fn counts(&self) -> Result<OutboxCounts, Error>
pub async fn counts(&self) -> Result<OutboxCounts, Error>
Number of rows in each status bucket. Diagnostics only — e.g.
difflore doctor surfaces this so users can see a backlog
building up.
Trait Implementations§
Source§impl Clone for OutboxQueue
impl Clone for OutboxQueue
Source§fn clone(&self) -> OutboxQueue
fn clone(&self) -> OutboxQueue
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl !RefUnwindSafe for OutboxQueue
impl !UnwindSafe for OutboxQueue
impl Freeze for OutboxQueue
impl Send for OutboxQueue
impl Sync for OutboxQueue
impl Unpin for OutboxQueue
impl UnsafeUnpin for OutboxQueue
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
Source§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>. Box<dyn Any> can
then be further downcast into Box<ConcreteType> where ConcreteType implements Trait.Source§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>. Rc<Any> can then be
further downcast into Rc<ConcreteType> where ConcreteType implements Trait.Source§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.Source§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.Source§impl<T> DowncastSync for T
impl<T> DowncastSync for T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more