Skip to main content

faucet_core/observability/
options.rs

1//! Options struct passed to `run_stream`. Replaces the prior positional
2//! argument list so future observability additions don't keep breaking the
3//! function signature.
4
5use crate::dlq::DlqConfig;
6use crate::state::StateStore;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Default, Clone)]
11pub struct RunStreamOptions {
12    pub state_store: Option<Arc<dyn StateStore>>,
13    pub state_key: Option<String>,
14    pub pipeline_name: Option<String>,
15    pub row: Option<String>,
16    pub run_id: Option<String>,
17    pub dlq: Option<DlqConfig>,
18    #[cfg(feature = "quality")]
19    pub quality: Option<std::sync::Arc<crate::quality::CompiledQuality>>,
20    /// Adaptive batch-size controller config; `None` (or `enabled = false`)
21    /// leaves the per-page write path unchanged.
22    pub adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
23    /// Cooperative cancellation. When set and cancelled mid-run, the streaming
24    /// loop stops polling new pages, **flushes the sinks** (so a buffered sink
25    /// like Parquet writes its footer / completes its upload rather than
26    /// orphaning the file), and returns the partial result. Without this, a
27    /// dropped run future loses everything written-but-unflushed (#146 H16).
28    pub cancel: Option<CancellationToken>,
29}
30
31impl RunStreamOptions {
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    pub fn with_state(mut self, store: Arc<dyn StateStore>, key: impl Into<String>) -> Self {
37        self.state_store = Some(store);
38        self.state_key = Some(key.into());
39        self
40    }
41
42    pub fn with_name(mut self, name: impl Into<String>) -> Self {
43        self.pipeline_name = Some(name.into());
44        self
45    }
46
47    pub fn with_row(mut self, row: impl Into<String>) -> Self {
48        self.row = Some(row.into());
49        self
50    }
51
52    pub fn with_run_id(mut self, id: impl Into<String>) -> Self {
53        self.run_id = Some(id.into());
54        self
55    }
56
57    pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
58        self.dlq = Some(dlq);
59        self
60    }
61
62    /// Attach a cancellation token for cooperative, flush-completing cancel.
63    pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
64        self.cancel = Some(cancel);
65        self
66    }
67
68    /// Attach an adaptive batch-size controller config.
69    pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
70        self.adaptive = Some(cfg);
71        self
72    }
73
74    #[cfg(feature = "quality")]
75    pub fn with_quality(
76        mut self,
77        quality: std::sync::Arc<crate::quality::CompiledQuality>,
78    ) -> Self {
79        self.quality = Some(quality);
80        self
81    }
82}