faucet_core/observability/
options.rs1use crate::dlq::DlqConfig;
6use crate::state::StateStore;
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Default, Clone)]
11pub struct RunStreamOptions {
12 pub state_store: Option<Arc<dyn StateStore>>,
13 pub state_key: Option<String>,
14 pub pipeline_name: Option<String>,
15 pub row: Option<String>,
16 pub run_id: Option<String>,
17 pub dlq: Option<DlqConfig>,
18 #[cfg(feature = "quality")]
19 pub quality: Option<std::sync::Arc<crate::quality::CompiledQuality>>,
20 pub adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
23 pub cancel: Option<CancellationToken>,
29 pub delivery: crate::idempotency::DeliveryMode,
32 pub start_seq: u64,
35}
36
37impl RunStreamOptions {
38 pub fn new() -> Self {
39 Self::default()
40 }
41
42 pub fn with_state(mut self, store: Arc<dyn StateStore>, key: impl Into<String>) -> Self {
43 self.state_store = Some(store);
44 self.state_key = Some(key.into());
45 self
46 }
47
48 pub fn with_name(mut self, name: impl Into<String>) -> Self {
49 self.pipeline_name = Some(name.into());
50 self
51 }
52
53 pub fn with_row(mut self, row: impl Into<String>) -> Self {
54 self.row = Some(row.into());
55 self
56 }
57
58 pub fn with_run_id(mut self, id: impl Into<String>) -> Self {
59 self.run_id = Some(id.into());
60 self
61 }
62
63 pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
64 self.dlq = Some(dlq);
65 self
66 }
67
68 pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
70 self.cancel = Some(cancel);
71 self
72 }
73
74 pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
76 self.adaptive = Some(cfg);
77 self
78 }
79
80 #[cfg(feature = "quality")]
81 pub fn with_quality(
82 mut self,
83 quality: std::sync::Arc<crate::quality::CompiledQuality>,
84 ) -> Self {
85 self.quality = Some(quality);
86 self
87 }
88
89 pub fn with_delivery(mut self, mode: crate::idempotency::DeliveryMode) -> Self {
91 self.delivery = mode;
92 self
93 }
94
95 pub fn with_start_seq(mut self, seq: u64) -> Self {
98 self.start_seq = seq;
99 self
100 }
101}