Skip to main content

faucet_core/observability/
options.rs

1//! Options struct passed to `run_stream`. Replaces the prior positional
2//! argument list so future observability additions don't keep breaking the
3//! function signature.
4
5use crate::dlq::DlqConfig;
6use crate::state::StateStore;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Default, Clone)]
11pub struct RunStreamOptions {
12    pub state_store: Option<Arc<dyn StateStore>>,
13    pub state_key: Option<String>,
14    pub pipeline_name: Option<String>,
15    pub row: Option<String>,
16    pub run_id: Option<String>,
17    pub dlq: Option<DlqConfig>,
18    #[cfg(feature = "quality")]
19    pub quality: Option<std::sync::Arc<crate::quality::CompiledQuality>>,
20    /// Adaptive batch-size controller config; `None` (or `enabled = false`)
21    /// leaves the per-page write path unchanged.
22    pub adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
23    /// Cooperative cancellation. When set and cancelled mid-run, the streaming
24    /// loop stops polling new pages, **flushes the sinks** (so a buffered sink
25    /// like Parquet writes its footer / completes its upload rather than
26    /// orphaning the file), and returns the partial result. Without this, a
27    /// dropped run future loses everything written-but-unflushed (#146 H16).
28    pub cancel: Option<CancellationToken>,
29    /// Delivery guarantee. `AtLeastOnce` (default) leaves the write path
30    /// unchanged. `ExactlyOnce` enables the resume/skip + atomic-token path.
31    pub delivery: crate::idempotency::DeliveryMode,
32    /// Resume sequence read from the (unwrapped) exactly-once state value.
33    /// Ignored unless `delivery == ExactlyOnce`. Defaults to 0.
34    pub start_seq: u64,
35}
36
37impl RunStreamOptions {
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    pub fn with_state(mut self, store: Arc<dyn StateStore>, key: impl Into<String>) -> Self {
43        self.state_store = Some(store);
44        self.state_key = Some(key.into());
45        self
46    }
47
48    pub fn with_name(mut self, name: impl Into<String>) -> Self {
49        self.pipeline_name = Some(name.into());
50        self
51    }
52
53    pub fn with_row(mut self, row: impl Into<String>) -> Self {
54        self.row = Some(row.into());
55        self
56    }
57
58    pub fn with_run_id(mut self, id: impl Into<String>) -> Self {
59        self.run_id = Some(id.into());
60        self
61    }
62
63    pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
64        self.dlq = Some(dlq);
65        self
66    }
67
68    /// Attach a cancellation token for cooperative, flush-completing cancel.
69    pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
70        self.cancel = Some(cancel);
71        self
72    }
73
74    /// Attach an adaptive batch-size controller config.
75    pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
76        self.adaptive = Some(cfg);
77        self
78    }
79
80    #[cfg(feature = "quality")]
81    pub fn with_quality(
82        mut self,
83        quality: std::sync::Arc<crate::quality::CompiledQuality>,
84    ) -> Self {
85        self.quality = Some(quality);
86        self
87    }
88
89    /// Set the delivery mode.
90    pub fn with_delivery(mut self, mode: crate::idempotency::DeliveryMode) -> Self {
91        self.delivery = mode;
92        self
93    }
94
95    /// Set the resume sequence (exactly-once). Normally derived by
96    /// `Pipeline::run` from the unwrapped state value.
97    pub fn with_start_seq(mut self, seq: u64) -> Self {
98        self.start_seq = seq;
99        self
100    }
101}