ppoppo-infra 0.1.0

Backend-agnostic infrastructure traits for caching, queuing, and messaging
Documentation
//! Backend-agnostic staging flusher trait.
//!
//! Consumes items from a staging queue via partition-based flush.
//! The write side is [`StagingWriter`](crate::StagingWriter); this is the read/drain side.

use async_trait::async_trait;

use crate::Result;

/// An item retrieved from a flushed staging partition.
#[derive(Debug, Clone)]
pub struct StagingItem {
    /// Unique item ID (ULID).
    pub id: String,
    /// Queue name this item belongs to.
    pub queue_name: String,
    /// Item payload data.
    pub payload: serde_json::Value,
}

/// Staging queue consumer (flush/drain side).
///
/// Manages time-range partitions: ensures future partitions exist,
/// lists sealed (past) partitions, flushes them, and recovers orphans.
#[async_trait]
pub trait StagingFlusher: Send + Sync {
    /// Ensure partitions exist for the next `ahead_count` time windows.
    async fn ensure_partitions(&self, ahead_count: u32) -> Result<()>;

    /// List partitions whose time range has ended (sealed) before the given cutoff.
    async fn list_sealed_partitions(
        &self,
        cutoff: time::OffsetDateTime,
    ) -> Result<Vec<String>>;

    /// Flush a sealed partition: DETACH, read all items, DROP.
    ///
    /// Returns the items that were in the partition.
    async fn flush_partition(&self, partition_name: &str) -> Result<Vec<StagingItem>>;

    /// List orphaned partitions (detached but not dropped from a previous crash).
    async fn list_orphaned_partitions(&self) -> Result<Vec<String>>;

    /// Recover items from an orphaned partition and drop it.
    async fn recover_orphaned_partition(
        &self,
        partition_name: &str,
    ) -> Result<Vec<StagingItem>>;

    /// Count total items in the staging queue for a given logical queue name.
    ///
    /// Used by health check endpoints for operational visibility.
    /// Returns -1 on error by convention (callers should treat as "unknown").
    async fn depth(&self, queue_name: &str) -> Result<i64>;
}