faucet-core 1.1.0

Shared types, traits, and utilities for the faucet-stream ecosystem
Documentation
//! Options struct passed to `run_stream`. Replaces the prior positional
//! argument list so future observability additions don't keep breaking the
//! function signature.

use crate::dlq::DlqConfig;
use crate::state::StateStore;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

#[derive(Default, Clone)]
pub struct RunStreamOptions {
    pub state_store: Option<Arc<dyn StateStore>>,
    pub state_key: Option<String>,
    pub pipeline_name: Option<String>,
    pub row: Option<String>,
    pub run_id: Option<String>,
    pub dlq: Option<DlqConfig>,
    #[cfg(feature = "quality")]
    pub quality: Option<std::sync::Arc<crate::quality::CompiledQuality>>,
    /// Adaptive batch-size controller config; `None` (or `enabled = false`)
    /// leaves the per-page write path unchanged.
    pub adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
    /// Cooperative cancellation. When set and cancelled mid-run, the streaming
    /// loop stops polling new pages, **flushes the sinks** (so a buffered sink
    /// like Parquet writes its footer / completes its upload rather than
    /// orphaning the file), and returns the partial result. Without this, a
    /// dropped run future loses everything written-but-unflushed (#146 H16).
    pub cancel: Option<CancellationToken>,
    /// Delivery guarantee. `AtLeastOnce` (default) leaves the write path
    /// unchanged. `ExactlyOnce` enables the resume/skip + atomic-token path.
    pub delivery: crate::idempotency::DeliveryMode,
    /// Resume sequence read from the (unwrapped) exactly-once state value.
    /// Ignored unless `delivery == ExactlyOnce`. Defaults to 0.
    pub start_seq: u64,
}

impl RunStreamOptions {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_state(mut self, store: Arc<dyn StateStore>, key: impl Into<String>) -> Self {
        self.state_store = Some(store);
        self.state_key = Some(key.into());
        self
    }

    pub fn with_name(mut self, name: impl Into<String>) -> Self {
        self.pipeline_name = Some(name.into());
        self
    }

    pub fn with_row(mut self, row: impl Into<String>) -> Self {
        self.row = Some(row.into());
        self
    }

    pub fn with_run_id(mut self, id: impl Into<String>) -> Self {
        self.run_id = Some(id.into());
        self
    }

    pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
        self.dlq = Some(dlq);
        self
    }

    /// Attach a cancellation token for cooperative, flush-completing cancel.
    pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
        self.cancel = Some(cancel);
        self
    }

    /// Attach an adaptive batch-size controller config.
    pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
        self.adaptive = Some(cfg);
        self
    }

    #[cfg(feature = "quality")]
    pub fn with_quality(
        mut self,
        quality: std::sync::Arc<crate::quality::CompiledQuality>,
    ) -> Self {
        self.quality = Some(quality);
        self
    }

    /// Set the delivery mode.
    pub fn with_delivery(mut self, mode: crate::idempotency::DeliveryMode) -> Self {
        self.delivery = mode;
        self
    }

    /// Set the resume sequence (exactly-once). Normally derived by
    /// `Pipeline::run` from the unwrapped state value.
    pub fn with_start_seq(mut self, seq: u64) -> Self {
        self.start_seq = seq;
        self
    }
}