1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! Options struct passed to `run_stream`. Replaces the prior positional
//! argument list so future observability additions don't keep breaking the
//! function signature.
use crate::dlq::DlqConfig;
use crate::state::StateStore;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[derive(Default, Clone)]
pub struct RunStreamOptions {
pub state_store: Option<Arc<dyn StateStore>>,
pub state_key: Option<String>,
pub pipeline_name: Option<String>,
pub row: Option<String>,
pub run_id: Option<String>,
pub dlq: Option<DlqConfig>,
#[cfg(feature = "quality")]
pub quality: Option<std::sync::Arc<crate::quality::CompiledQuality>>,
/// Adaptive batch-size controller config; `None` (or `enabled = false`)
/// leaves the per-page write path unchanged.
pub adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
/// Cooperative cancellation. When set and cancelled mid-run, the streaming
/// loop stops polling new pages, **flushes the sinks** (so a buffered sink
/// like Parquet writes its footer / completes its upload rather than
/// orphaning the file), and returns the partial result. Without this, a
/// dropped run future loses everything written-but-unflushed (#146 H16).
pub cancel: Option<CancellationToken>,
/// Delivery guarantee. `AtLeastOnce` (default) leaves the write path
/// unchanged. `ExactlyOnce` enables the resume/skip + atomic-token path.
pub delivery: crate::idempotency::DeliveryMode,
/// Resume sequence read from the (unwrapped) exactly-once state value.
/// Ignored unless `delivery == ExactlyOnce`. Defaults to 0.
pub start_seq: u64,
}
impl RunStreamOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_state(mut self, store: Arc<dyn StateStore>, key: impl Into<String>) -> Self {
self.state_store = Some(store);
self.state_key = Some(key.into());
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.pipeline_name = Some(name.into());
self
}
pub fn with_row(mut self, row: impl Into<String>) -> Self {
self.row = Some(row.into());
self
}
pub fn with_run_id(mut self, id: impl Into<String>) -> Self {
self.run_id = Some(id.into());
self
}
pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
self.dlq = Some(dlq);
self
}
/// Attach a cancellation token for cooperative, flush-completing cancel.
pub fn with_cancel(mut self, cancel: CancellationToken) -> Self {
self.cancel = Some(cancel);
self
}
/// Attach an adaptive batch-size controller config.
pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
self.adaptive = Some(cfg);
self
}
#[cfg(feature = "quality")]
pub fn with_quality(
mut self,
quality: std::sync::Arc<crate::quality::CompiledQuality>,
) -> Self {
self.quality = Some(quality);
self
}
/// Set the delivery mode.
pub fn with_delivery(mut self, mode: crate::idempotency::DeliveryMode) -> Self {
self.delivery = mode;
self
}
/// Set the resume sequence (exactly-once). Normally derived by
/// `Pipeline::run` from the unwrapped state value.
pub fn with_start_seq(mut self, seq: u64) -> Self {
self.start_seq = seq;
self
}
}