Skip to main content

faucet_core/
pipeline.rs

1//! Source-to-sink pipeline orchestration.
2//!
3//! The [`Pipeline`] struct connects any [`Source`] to any
4//! [`Sink`] and handles moving data between them.
5//!
6//! # Batch mode
7//!
8//! Fetches all records from the source, then writes them to the sink in one
9//! shot.  Supports incremental replication (returns a bookmark for the next
10//! run).
11//!
12//! ```rust,no_run
13//! use faucet_core::{Pipeline, Source, Sink};
14//! # async fn example(source: impl Source, sink: impl Sink) -> Result<(), faucet_core::FaucetError> {
15//! let result = Pipeline::new(&source, &sink).run().await?;
16//! println!("wrote {} records", result.records_written);
17//! // Persist result.bookmark for the next incremental run
18//! # Ok(())
19//! # }
20//! ```
21//!
22//! # Streaming mode
23//!
24//! Writes records page-by-page as they arrive from a source's
25//! [`stream_pages`](crate::Source::stream_pages) implementation, keeping
26//! memory usage bounded.  [`Pipeline::run`] uses this internally; callers
27//! that have already assembled a [`StreamPage`] stream can drive it directly
28//! via [`run_stream`].
29//!
30//! ```rust,no_run
31//! use faucet_core::{run_stream, RunStreamOptions, Sink, StreamPage, FaucetError};
32//! use futures_core::Stream;
33//! # async fn example(
34//! #     pages: impl Stream<Item = Result<StreamPage, FaucetError>> + Unpin,
35//! #     sink: impl Sink,
36//! # ) -> Result<(), FaucetError> {
37//! let result = run_stream(pages, &sink, RunStreamOptions::new()).await?;
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::dlq::{DlqConfig, DlqStats};
43use crate::error::FaucetError;
44use crate::observability::RunStreamOptions;
45use crate::state::{StateStore, validate_state_key};
46use crate::traits::{Sink, Source};
47use futures_core::Stream;
48use serde_json::Value;
49use std::pin::Pin;
50use std::sync::Arc;
51
52/// Default page size used when a caller does not specify one.
53///
54/// Sources are free to override this from their own config when implementing
55/// [`Source::stream_pages`]; the value passed
56/// from the pipeline acts as a hint when no source-side preference exists.
57pub const DEFAULT_BATCH_SIZE: usize = 1000;
58
59/// Hard upper bound on `batch_size`. Values above this (other than the
60/// special `0` "no batching" sentinel) are rejected at config validation
61/// time to prevent accidental O(total) buffering in the default
62/// implementation of [`Source::stream_pages`].
63pub const MAX_BATCH_SIZE: usize = 1_000_000;
64
65/// Validate a `batch_size` value against the global constraints.
66///
67/// `batch_size = 0` is the **opt-out-of-batching sentinel**: sources and
68/// sinks should treat it as "emit / accept the entire result set in one
69/// page." This is useful for small lookup tables or for sinks (e.g. SQL
70/// `COPY`, BigQuery load jobs) that prefer one large request to many small
71/// ones. Any non-zero value above [`MAX_BATCH_SIZE`] is rejected to prevent
72/// accidental unbounded buffering through a typo.
73///
74/// Returns the unchanged value on success. Returns `FaucetError::Config`
75/// only for values strictly greater than [`MAX_BATCH_SIZE`].
76pub fn validate_batch_size(batch_size: usize) -> Result<usize, FaucetError> {
77    if batch_size > MAX_BATCH_SIZE {
78        return Err(FaucetError::Config(format!(
79            "batch_size {batch_size} exceeds maximum {MAX_BATCH_SIZE} \
80             (use 0 to opt out of batching entirely)"
81        )));
82    }
83    Ok(batch_size)
84}
85
86/// One page emitted by [`Source::stream_pages`].
87///
88/// `records` is the chunk of records for this page. `bookmark` is `Some` only
89/// when the source has a durable checkpoint to advance — most sources emit
90/// `Some` only on the final page (max-replication-value semantics); CDC-style
91/// sources emit `Some` per committed transaction. The pipeline flushes the
92/// sink and persists the bookmark every time a page carries one, so a
93/// mid-stream crash never advances past records the sink has not durably
94/// written.
95#[derive(Debug, Clone, Default)]
96pub struct StreamPage {
97    /// Records to write to the sink for this page.
98    pub records: Vec<Value>,
99    /// Optional bookmark to checkpoint after this page is durably written.
100    pub bookmark: Option<Value>,
101}
102
103/// Result of a pipeline run.
104#[derive(Debug, Clone)]
105pub struct PipelineResult {
106    /// Total number of records written to the sink.
107    pub records_written: usize,
108    /// Bookmark value for incremental replication.
109    ///
110    /// `Some(value)` when the source returned a bookmark on its final
111    /// (or, for streaming CDC sources, most recent) page. Persist this and
112    /// pass it back as `start_replication_value` on the next run; this is
113    /// handled automatically when a [`StateStore`] is attached via
114    /// [`Pipeline::with_state_store`].
115    pub bookmark: Option<Value>,
116    /// DLQ counters. `None` when no DLQ is configured.
117    pub dlq: Option<DlqStats>,
118}
119
120/// A pipeline that moves data from a [`Source`] to a [`Sink`].
121///
122/// The pipeline is generic over the source and sink types — any combination
123/// of connectors works as long as they implement the respective traits.
124pub struct Pipeline<'a, So: Source + ?Sized, Si: Sink + ?Sized> {
125    source: &'a So,
126    sink: &'a Si,
127    state_store: Option<Arc<dyn StateStore>>,
128    name: Option<String>,
129    row: Option<String>,
130    run_id: Option<String>,
131    dlq: Option<DlqConfig>,
132    #[cfg(feature = "quality")]
133    quality: Option<Arc<crate::quality::CompiledQuality>>,
134    adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
135    cancel: Option<tokio_util::sync::CancellationToken>,
136}
137
138impl<'a, So: Source + ?Sized, Si: Sink + ?Sized> Pipeline<'a, So, Si> {
139    /// Create a new pipeline from a source and a sink.
140    pub fn new(source: &'a So, sink: &'a Si) -> Self {
141        Self {
142            source,
143            sink,
144            state_store: None,
145            name: None,
146            row: None,
147            run_id: None,
148            dlq: None,
149            #[cfg(feature = "quality")]
150            quality: None,
151            adaptive: None,
152            cancel: None,
153        }
154    }
155
156    /// Attach a [`StateStore`] for persistent incremental-replication bookmarks.
157    ///
158    /// When configured, `run()` will:
159    /// 1. Read any previously stored bookmark at the source's
160    ///    [`state_key`](Source::state_key) and call
161    ///    [`apply_start_bookmark`](Source::apply_start_bookmark) on the source
162    ///    so it can resume from that point.
163    /// 2. Run the fetch + write as usual.
164    /// 3. Persist the new bookmark **only after** the sink confirms the
165    ///    batch was written and flushed.
166    ///
167    /// Sources that do not return a [`state_key`](Source::state_key) are
168    /// unaffected — the store is consulted only when the source opts in.
169    pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
170        self.state_store = Some(store);
171        self
172    }
173
174    /// Set the pipeline name used in spans and metric labels.
175    /// Defaults to `"unnamed"` when unset.
176    pub fn with_name(mut self, name: impl Into<String>) -> Self {
177        self.name = Some(name.into());
178        self
179    }
180
181    /// Set the matrix row id used in spans and metric labels.
182    /// Defaults to `""` (Prometheus treats empty labels as absent).
183    pub fn with_row(mut self, row: impl Into<String>) -> Self {
184        self.row = Some(row.into());
185        self
186    }
187
188    /// Set an explicit run id (UUIDv7-shaped). When unset, `Pipeline::run`
189    /// generates one. Used only as a tracing span attribute — never a metric
190    /// label.
191    pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
192        self.run_id = Some(run_id.into());
193        self
194    }
195
196    /// Attach a DLQ for per-row failure routing.
197    pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
198        self.dlq = Some(dlq);
199        self
200    }
201
202    /// Attach a compiled quality spec. Checks run after transforms, before the
203    /// sink, per page.
204    #[cfg(feature = "quality")]
205    pub fn with_quality(mut self, quality: Arc<crate::quality::CompiledQuality>) -> Self {
206        self.quality = Some(quality);
207        self
208    }
209
210    /// Attach an adaptive batch-size controller (opt-in). When `enabled`, the
211    /// pipeline reslices each source page into sub-batches whose size the
212    /// controller tunes from observed sink latency + error rate.
213    pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
214        self.adaptive = Some(cfg);
215        self
216    }
217
218    /// Attach a cancellation token. When cancelled mid-run, the streaming loop
219    /// stops at the next page boundary, flushes the sink(s) so buffered output
220    /// (e.g. a Parquet footer) is durable, and returns the partial result
221    /// instead of leaving the file unreadable (#146 H16).
222    pub fn with_cancel(mut self, cancel: tokio_util::sync::CancellationToken) -> Self {
223        self.cancel = Some(cancel);
224        self
225    }
226
227    /// Run the pipeline in streaming mode.
228    ///
229    /// 1. Loads the stored bookmark and pushes it to the source (if a state
230    ///    store is configured and the source returns a `state_key`).
231    /// 2. Drives [`Source::stream_pages`] with [`DEFAULT_BATCH_SIZE`],
232    ///    writing each page to the sink as it arrives via
233    ///    [`Sink::write_batch`].
234    /// 3. Whenever a page carries `Some(bookmark)`, flushes the sink and
235    ///    persists the bookmark to the state store before polling the next
236    ///    page. This makes per-page CDC checkpointing automatic.
237    /// 4. Flushes the sink one final time after the stream completes.
238    /// 5. Returns a [`PipelineResult`] with the total count and the last
239    ///    bookmark observed.
240    pub async fn run(&self) -> Result<PipelineResult, FaucetError> {
241        use crate::observability::{
242            DurationGuard, InstrumentedSink, InstrumentedSource, InstrumentedStateStore, Labels,
243        };
244        use metrics::{Label, SharedString, counter, gauge};
245        use tracing::Instrument;
246
247        // Resolve identity for this run.
248        let name = self.name.clone().unwrap_or_else(|| "unnamed".to_string());
249        let row = self.row.clone().unwrap_or_default();
250        let run_id = self
251            .run_id
252            .clone()
253            .unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
254        let obs_labels = Labels::new(name.clone(), row.clone(), run_id.clone());
255
256        // Wrap source, sink, state-store.
257        let wrapped_source = InstrumentedSource::new(self.source, obs_labels.clone());
258        let wrapped_sink = InstrumentedSink::new(self.sink, obs_labels.clone());
259        let wrapped_state_store: Option<Arc<dyn StateStore>> = self.state_store.as_ref().map(|s| {
260            Arc::new(InstrumentedStateStore::new(
261                Arc::clone(s),
262                obs_labels.clone(),
263            )) as Arc<dyn StateStore>
264        });
265
266        // Pipeline-level span. Use .instrument(span) on the inner future so
267        // the span correctly enters/exits across awaits.
268        let span = tracing::info_span!(
269            "faucet.pipeline.run",
270            pipeline = %name,
271            row = %row,
272            run_id = %run_id,
273            source = %wrapped_source.connector_name(),
274            sink = %wrapped_sink.connector_name(),
275        );
276
277        // Per-pipeline metric labels (pipeline + row).
278        let base_labels: Vec<Label> = vec![
279            Label::new("pipeline", SharedString::from(name.clone())),
280            Label::new("row", SharedString::from(row.clone())),
281        ];
282        let run_labels: Vec<Label> = {
283            let mut v = base_labels.clone();
284            v.push(Label::new(
285                "source",
286                SharedString::from(wrapped_source.connector_name().to_string()),
287            ));
288            v.push(Label::new(
289                "sink",
290                SharedString::from(wrapped_sink.connector_name().to_string()),
291            ));
292            v
293        };
294
295        // RAII guard so the in-flight gauge stays consistent even on cancellation.
296        struct InFlightGuard(Vec<Label>);
297        impl Drop for InFlightGuard {
298            fn drop(&mut self) {
299                gauge!("faucet_pipeline_in_flight", self.0.clone()).decrement(1.0);
300            }
301        }
302        gauge!("faucet_pipeline_in_flight", base_labels.clone()).increment(1.0);
303        let _in_flight = InFlightGuard(base_labels.clone());
304
305        // Stamp the start time so dashboards can compute uptime for long-running
306        // (streaming / CDC) pipelines where `*_run_duration_seconds` never fires.
307        let start_unix = std::time::SystemTime::now()
308            .duration_since(std::time::UNIX_EPOCH)
309            .map(|d| d.as_secs_f64())
310            .unwrap_or(0.0);
311        gauge!(
312            "faucet_pipeline_start_time_unix_seconds",
313            base_labels.clone()
314        )
315        .set(start_unix);
316
317        // Histogram timer for the whole run.
318        let _run_timer =
319            DurationGuard::new("faucet_pipeline_run_duration_seconds", run_labels.clone());
320
321        // Run inside the span.
322        let result = async {
323            // Bookmark resume — goes through the wrapped state store so the
324            // get is instrumented too.
325            let state_key = self.source.state_key();
326            if let (Some(store), Some(key)) = (wrapped_state_store.as_ref(), state_key.as_ref()) {
327                validate_state_key(key)?;
328                if let Some(prior) = store.get(key).await? {
329                    wrapped_source.apply_start_bookmark(prior).await?;
330                }
331            }
332
333            let ctx = std::collections::HashMap::new();
334            let pages = wrapped_source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
335
336            let mut opts = RunStreamOptions::new()
337                .with_name(name.clone())
338                .with_row(row.clone())
339                .with_run_id(run_id.clone());
340            if let (Some(store), Some(key)) = (wrapped_state_store.clone(), state_key) {
341                opts = opts.with_state(store, key);
342            }
343            if let Some(dlq) = self.dlq.clone() {
344                opts = opts.with_dlq(dlq);
345            }
346            #[cfg(feature = "quality")]
347            if let Some(q) = self.quality.clone() {
348                opts = opts.with_quality(q);
349            }
350            if let Some(ad) = self.adaptive.clone() {
351                opts = opts.with_adaptive(ad);
352            }
353            if let Some(cancel) = self.cancel.clone() {
354                opts = opts.with_cancel(cancel);
355            }
356
357            run_stream(pages, &wrapped_sink, opts).await
358        }
359        .instrument(span)
360        .await;
361
362        // Final run-counter increment. On error, also attach a `kind` label
363        // (matching the FaucetError variant) so dashboards can break out failed
364        // runs by error type without spelunking the *_errors_total surfaces.
365        let status = if result.is_ok() { "ok" } else { "err" };
366        let mut final_labels = run_labels;
367        final_labels.push(Label::new("status", SharedString::const_str(status)));
368        if let Err(ref e) = result {
369            final_labels.push(Label::new(
370                "kind",
371                SharedString::const_str(crate::observability::decorator::error_kind(e)),
372            ));
373        }
374        counter!("faucet_pipeline_runs_total", final_labels).increment(1);
375
376        result
377    }
378}
379
380/// Run a streaming pipeline, writing each [`StreamPage`] to the sink as it
381/// arrives and persisting bookmarks per page.
382///
383/// This keeps memory usage bounded — only one page of records is held at a
384/// time. The stream comes from [`Source::stream_pages`] (or any
385/// `Stream<Item = Result<StreamPage, FaucetError>>` a caller assembles
386/// directly).
387///
388/// Bookmark semantics: whenever a page carries `Some(bookmark)`, the sink is
389/// flushed and the bookmark is persisted (when `state_store` and `state_key`
390/// are both `Some`) before the next page is polled. Sources that only know
391/// their bookmark after seeing every record emit `Some` on the final page;
392/// CDC-style sources emit `Some` per committed transaction and get
393/// per-transaction durability automatically.
394///
395/// Returns the cumulative [`PipelineResult`] — `records_written` is the sum
396/// across all pages and `bookmark` is the last per-page bookmark observed.
397pub async fn run_stream<S, Si>(
398    mut pages: S,
399    sink: &Si,
400    options: RunStreamOptions,
401) -> Result<PipelineResult, FaucetError>
402where
403    S: Stream<Item = Result<StreamPage, FaucetError>> + Unpin,
404    Si: Sink + ?Sized,
405{
406    use crate::dlq::{DlqStats, OnBatchError, build_envelope};
407
408    let state_store = options.state_store.clone();
409    let state_key = options.state_key.clone();
410    let pipeline_name = options.pipeline_name.unwrap_or_else(|| "unnamed".into());
411    let row = options.row.unwrap_or_default();
412    let run_id = options.run_id.unwrap_or_default();
413    let dlq = options.dlq.clone();
414    let cancel = options.cancel.clone();
415
416    #[cfg(feature = "quality")]
417    let quality = options.quality.clone();
418
419    // Fail fast: quarantine requires a DLQ sink.
420    #[cfg(feature = "quality")]
421    if let Some(q) = quality.as_ref()
422        && q.requires_dlq()
423        && dlq.is_none()
424    {
425        return Err(FaucetError::Config(
426            "quality: on_failure 'quarantine'/'quarantine_batch' requires a DLQ sink".into(),
427        ));
428    }
429
430    if let Some(key) = state_key.as_ref() {
431        validate_state_key(key)?;
432    }
433
434    let mut records_written = 0usize;
435    let mut last_bookmark: Option<Value> = None;
436    let mut dlq_stats = DlqStats::default();
437
438    let adaptive_cfg = options.adaptive.clone().filter(|c| c.enabled);
439    // Validate at the core boundary so library callers of `run_stream` (not
440    // just the CLI, which validates earlier) reject an invalid adaptive config
441    // — e.g. the rejected `respect_source_max=false` knob — up front.
442    if let Some(cfg) = adaptive_cfg.as_ref() {
443        cfg.validate()?;
444    }
445    let mut controller: Option<crate::adaptive::AimdController> = None;
446    let mut warned_noop_sink = false;
447
448    let sink_name = sink.connector_name();
449    let dlq_sink_name = dlq.as_ref().map(|d| d.sink.connector_name()).unwrap_or("");
450
451    // Drive the streaming loop inside an inner future so that EVERY early exit
452    // (a source error, a `?`-propagated write/flush/state failure, or a DLQ
453    // budget overflow) funnels through one place. On any error we best-effort
454    // flush the sinks before propagating, so a buffered sink that only commits
455    // on flush — Parquet writes its footer there; without it the whole file is
456    // unreadable — does not lose everything written so far (#78/#3).
457    // Set when the loop exits because the cancellation token fired (vs. the
458    // stream ending naturally). Either way we fall through to the success-path
459    // flush below, so a buffered sink (Parquet footer, S3 multipart) is made
460    // durable — the difference from a dropped future, which flushes nothing.
461    let mut cancelled = false;
462    let loop_result: Result<(), FaucetError> = async {
463        loop {
464            // Poll the next page, but if a cancellation token is wired, race it
465            // so a cancel between pages stops the run promptly and cleanly
466            // (#146 H16). `biased` checks cancellation first each iteration.
467            let page = match &cancel {
468                Some(token) => tokio::select! {
469                    biased;
470                    _ = token.cancelled() => {
471                        cancelled = true;
472                        break;
473                    }
474                    p = std::future::poll_fn(|cx| Pin::new(&mut pages).poll_next(cx)) => p,
475                },
476                None => std::future::poll_fn(|cx| Pin::new(&mut pages).poll_next(cx)).await,
477            };
478            match page {
479                Some(Ok(page)) => {
480                    if page.records.is_empty() && page.bookmark.is_none() {
481                        continue;
482                    }
483
484                    // ── Quality pass (after transforms, before sink) ─────────
485                    #[cfg(feature = "quality")]
486                    let (records, quality_envelopes): (Vec<Value>, Vec<Value>) =
487                        if let Some(q) = quality.as_ref() {
488                            let labels =
489                                crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
490                            let outcome = crate::observability::instrumented_apply_quality(
491                                page.records,
492                                q,
493                                &labels,
494                            )?;
495                            let envelopes: Vec<Value> = outcome
496                                .quarantined
497                                .iter()
498                                .map(|qr| {
499                                    let err = FaucetError::QualityFailure {
500                                        check: qr.check.to_string(),
501                                        message: qr.message.clone(),
502                                    };
503                                    // `record_index` is the position within the PAGE
504                                    // (the frozen envelope contract), not the index in
505                                    // the quarantine list (#146 R).
506                                    build_envelope(
507                                        &qr.record,
508                                        &err,
509                                        sink_name,
510                                        &pipeline_name,
511                                        &row,
512                                        qr.page_index,
513                                    )
514                                })
515                                .collect();
516                            (outcome.survivors, envelopes)
517                        } else {
518                            (page.records, Vec::new())
519                        };
520                    #[cfg(not(feature = "quality"))]
521                    let (records, quality_envelopes): (Vec<Value>, Vec<Value>) =
522                        (page.records, Vec::new());
523
524                    let page = StreamPage {
525                        records,
526                        bookmark: page.bookmark,
527                    };
528
529                    if let Some(ref dlq_cfg) = dlq {
530                        // ── DLQ-enabled path ───────────────────────────────────
531                        use crate::dlq::DlqReason;
532                        use metrics::{Label, SharedString, counter};
533                        let metric_labels: Vec<Label> = vec![
534                            Label::new("pipeline", SharedString::from(pipeline_name.clone())),
535                            Label::new("row", SharedString::from(row.clone())),
536                            Label::new("connector", SharedString::from(sink_name.to_string())),
537                            Label::new(
538                                "dlq_connector",
539                                SharedString::from(dlq_sink_name.to_string()),
540                            ),
541                        ];
542                        let span = tracing::info_span!(
543                            "faucet.dlq.route",
544                            pipeline = %pipeline_name,
545                            row = %row,
546                            run_id = %run_id,
547                            connector = %sink_name,
548                            dlq_connector = %dlq_sink_name,
549                        );
550                        let _enter = span.enter();
551
552                        // Reslice the page into sub-batches driven by the
553                        // adaptive controller (or write the whole page in one
554                        // shot when adaptive is disabled — same as before).
555                        let mut envelopes: Vec<Value> = Vec::new();
556                        let mut page_success = 0usize;
557                        let mut outer_err_recovered = false;
558                        // True if any chunk reported genuine per-row sink `Err`s
559                        // (as opposed to a chunk wholly synthesized from an outer
560                        // error under `DlqAll`). Drives the `partial` label when a
561                        // resliced page mixes the two failure modes.
562                        let mut had_per_row_sink_failure = false;
563                        let records_len = page.records.len();
564                        let mut offset = 0usize;
565                        while offset < records_len {
566                            let size = match adaptive_cfg.as_ref() {
567                                Some(cfg) => {
568                                    let ctrl = controller.get_or_insert_with(|| {
569                                        crate::adaptive::AimdController::new(cfg, records_len)
570                                    });
571                                    ctrl.current().max(1).min(records_len - offset)
572                                }
573                                None => records_len - offset, // whole page = today's behavior
574                            };
575                            if adaptive_cfg.is_some() {
576                                maybe_warn_noop_sink(sink_name, &mut warned_noop_sink);
577                            }
578                            let chunk = &page.records[offset..offset + size];
579                            let t0 = std::time::Instant::now();
580                            let chunk_outcomes_result = sink.write_batch_partial(chunk).await;
581                            let latency = t0.elapsed();
582                            // `chunk_synthesized` is true only when this chunk's
583                            // outcomes were fabricated from a single outer
584                            // `write_batch_partial` error under `DlqAll` — as
585                            // opposed to genuine per-row `Err`s the sink
586                            // reported. Tracking it per chunk keeps the page
587                            // `reason` label accurate when adaptive reslicing
588                            // mixes a synthesized chunk with partial-failure
589                            // chunks on the same page.
590                            let (chunk_outcomes, chunk_synthesized): (
591                                Vec<crate::RowOutcome>,
592                                bool,
593                            ) = match chunk_outcomes_result {
594                                Ok(o) => (o, false),
595                                Err(e) => match dlq_cfg.on_batch_error {
596                                    OnBatchError::Propagate => return Err(e),
597                                    OnBatchError::DlqAll => {
598                                        outer_err_recovered = true;
599                                        let msg = e.to_string();
600                                        let synth = (0..chunk.len())
601                                            .map(|_| Err(FaucetError::Sink(msg.clone())))
602                                            .collect();
603                                        (synth, true)
604                                    }
605                                },
606                            };
607                            let mut chunk_errors = 0usize;
608                            for (j, outcome) in chunk_outcomes.iter().enumerate() {
609                                match outcome {
610                                    Ok(()) => page_success += 1,
611                                    Err(err) => {
612                                        chunk_errors += 1;
613                                        if !chunk_synthesized {
614                                            had_per_row_sink_failure = true;
615                                        }
616                                        envelopes.push(build_envelope(
617                                            &chunk[j],
618                                            err,
619                                            sink_name,
620                                            &pipeline_name,
621                                            &row,
622                                            offset + j,
623                                        ));
624                                    }
625                                }
626                            }
627                            if let Some(ctrl) = controller.as_mut() {
628                                let adj = ctrl.observe(crate::adaptive::Observation {
629                                    batch_len: chunk.len(),
630                                    errors: chunk_errors,
631                                    latency,
632                                });
633                                emit_adaptive_metrics(ctrl, adj, &pipeline_name, &row);
634                            }
635                            offset += size;
636                        }
637                        // Quality-quarantined records share the DLQ budget/write.
638                        // Capture the quality count BEFORE the splice — the splice
639                        // moves `quality_envelopes`, so its length is unavailable
640                        // afterward. Used below to pick the page `reason` label.
641                        #[cfg(feature = "quality")]
642                        let quality_count = quality_envelopes.len();
643                        #[cfg(not(feature = "quality"))]
644                        let quality_count = 0usize;
645                        envelopes.splice(0..0, quality_envelopes);
646                        let page_failures = envelopes.len();
647
648                        // Budget checks. `write_batch_partial` above already
649                        // committed this page's survivors to the main sink, so
650                        // we must NOT abort here: returning now would strand
651                        // those committed survivors without advancing the
652                        // bookmark (they would re-deliver on the next run) and
653                        // drop this page's failures before they reach the DLQ
654                        // (#146 M4). Instead, record the budget error, finish
655                        // committing the page below (route failures to the DLQ,
656                        // flush, persist the bookmark), and abort only once the
657                        // page is fully durable. The failed rows that crossed
658                        // the threshold are still written to the DLQ — losing
659                        // them would be strictly worse than the small overshoot.
660                        let mut budget_error: Option<FaucetError> = None;
661                        if let Some(limit) = dlq_cfg.max_failures_per_page
662                            && page_failures > limit
663                        {
664                            let mut lbl = metric_labels.clone();
665                            lbl.retain(|l| l.key() != "dlq_connector");
666                            lbl.push(Label::new("scope", SharedString::const_str("per_page")));
667                            counter!("faucet_sink_dlq_budget_exceeded_total", lbl).increment(1);
668                            budget_error = Some(FaucetError::Sink(format!(
669                                "DLQ per-page budget exceeded: {page_failures} > {limit}"
670                            )));
671                        }
672                        let new_total = dlq_stats.records_dlq + page_failures;
673                        if budget_error.is_none()
674                            && let Some(limit) = dlq_cfg.max_failures_total
675                            && new_total > limit
676                        {
677                            let mut lbl = metric_labels.clone();
678                            lbl.retain(|l| l.key() != "dlq_connector");
679                            lbl.push(Label::new("scope", SharedString::const_str("total")));
680                            counter!("faucet_sink_dlq_budget_exceeded_total", lbl).increment(1);
681                            budget_error = Some(FaucetError::Sink(format!(
682                                "DLQ total budget exceeded: {new_total} > {limit}"
683                            )));
684                        }
685
686                        // Write to DLQ sink. Errors here are fatal, no recursion.
687                        if !envelopes.is_empty() {
688                            let _dlq_write_timer = crate::observability::DurationGuard::new(
689                                "faucet_sink_dlq_write_duration_seconds",
690                                metric_labels.clone(),
691                            );
692                            dlq_cfg.sink.write_batch(&envelopes).await.map_err(|e| {
693                                let mut lbl = metric_labels.clone();
694                                lbl.push(Label::new(
695                                    "kind",
696                                    SharedString::const_str(
697                                        crate::observability::decorator::error_kind(&e),
698                                    ),
699                                ));
700                                counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
701                                FaucetError::Sink(format!("DLQ sink write failed: {e}"))
702                            })?;
703                            dlq_stats.records_dlq += page_failures;
704                            dlq_stats.pages_with_failures += 1;
705
706                            // Page `reason` label, 3-way (precedence: partial > dlq_all > quality):
707                            //  - `partial`  — at least one chunk reported genuine
708                            //    per-row sink `Err`s. Checked FIRST so a resliced
709                            //    page that mixes a synthesized chunk (DlqAll) with
710                            //    partial-failure chunks is labeled `partial` — the
711                            //    real per-row failure dominates. (For a
712                            //    non-resliced page this is equivalent to the old
713                            //    `page_failures > quality_count` test, since a
714                            //    single chunk is either all-synthesized or all
715                            //    per-row.)
716                            //  - `dlq_all`  — every sink-side failure on the page
717                            //    was synthesized from an outer `write_batch_partial`
718                            //    error (OnBatchError::DlqAll); no genuine per-row
719                            //    failures occurred.
720                            //  - `quality`  — every envelope is quality-sourced
721                            //    (no sink-side failures on this page).
722                            // The per-row quality volume is separately exposed via
723                            // `faucet_quality_records_quarantined_total`.
724                            let reason_label = if had_per_row_sink_failure {
725                                DlqReason::Partial.as_str()
726                            } else if outer_err_recovered {
727                                DlqReason::DlqAll.as_str()
728                            } else if page_failures > quality_count {
729                                DlqReason::Partial.as_str()
730                            } else {
731                                DlqReason::Quality.as_str()
732                            };
733                            counter!("faucet_sink_dlq_records_total", metric_labels.clone())
734                                .increment(page_failures as u64);
735                            let mut page_labels = metric_labels.clone();
736                            page_labels
737                                .push(Label::new("reason", SharedString::const_str(reason_label)));
738                            counter!("faucet_sink_dlq_pages_total", page_labels).increment(1);
739                        }
740
741                        records_written += page_success;
742
743                        if let Some(bookmark) = page.bookmark {
744                            sink.flush().await?;
745                            let _dlq_flush_timer = crate::observability::DurationGuard::new(
746                                "faucet_sink_dlq_flush_duration_seconds",
747                                metric_labels.clone(),
748                            );
749                            dlq_cfg.sink.flush().await.map_err(|e| {
750                                let mut lbl = metric_labels.clone();
751                                lbl.push(Label::new(
752                                    "kind",
753                                    SharedString::const_str(
754                                        crate::observability::decorator::error_kind(&e),
755                                    ),
756                                ));
757                                counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
758                                FaucetError::Sink(format!("DLQ sink flush failed: {e}"))
759                            })?;
760                            let bm_labels =
761                                crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
762                            crate::observability::update_bookmark_lag(&bookmark, &bm_labels);
763                            if let (Some(store), Some(key)) =
764                                (state_store.as_ref(), state_key.as_ref())
765                            {
766                                store.put(key, &bookmark).await?;
767                            }
768                            last_bookmark = Some(bookmark);
769                        }
770
771                        // The page is now durable — survivors committed to the
772                        // main sink, failures routed to the DLQ, and (if the
773                        // page carried one) the bookmark persisted. Honor a
774                        // deferred DLQ-budget abort now, so the run still stops
775                        // as a circuit breaker but never re-delivers this
776                        // already-committed page (#146 M4).
777                        if let Some(e) = budget_error {
778                            return Err(e);
779                        }
780                    } else {
781                        // ── DLQ-disabled path (today's behaviour) ──────────────
782                        debug_assert!(
783                            quality_envelopes.is_empty(),
784                            "quality quarantine without DLQ should have been rejected at run start"
785                        );
786                        if !page.records.is_empty() {
787                            if let Some(cfg) = adaptive_cfg.as_ref() {
788                                let ctrl = controller.get_or_insert_with(|| {
789                                    crate::adaptive::AimdController::new(cfg, page.records.len())
790                                });
791                                maybe_warn_noop_sink(sink_name, &mut warned_noop_sink);
792                                let mut offset = 0;
793                                while offset < page.records.len() {
794                                    let size =
795                                        ctrl.current().max(1).min(page.records.len() - offset);
796                                    let chunk = &page.records[offset..offset + size];
797                                    let t0 = std::time::Instant::now();
798                                    let n = sink.write_batch(chunk).await?;
799                                    let latency = t0.elapsed();
800                                    records_written += n;
801                                    offset += size;
802                                    let adj = ctrl.observe(crate::adaptive::Observation {
803                                        batch_len: chunk.len(),
804                                        errors: 0,
805                                        latency,
806                                    });
807                                    emit_adaptive_metrics(ctrl, adj, &pipeline_name, &row);
808                                }
809                            } else {
810                                records_written += sink.write_batch(&page.records).await?;
811                            }
812                        }
813                        if let Some(bookmark) = page.bookmark {
814                            sink.flush().await?;
815                            let bm_labels =
816                                crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
817                            crate::observability::update_bookmark_lag(&bookmark, &bm_labels);
818                            if let (Some(store), Some(key)) =
819                                (state_store.as_ref(), state_key.as_ref())
820                            {
821                                store.put(key, &bookmark).await?;
822                            }
823                            last_bookmark = Some(bookmark);
824                        }
825                    }
826                }
827                Some(Err(e)) => return Err(e),
828                None => break,
829            }
830        }
831        Ok(())
832    }
833    .await;
834
835    // Error/early-return unwind: best-effort flush so any buffered output is
836    // made durable, then propagate the ORIGINAL error. Flush errors here are
837    // logged and swallowed — the source/sink error that triggered the unwind
838    // is the meaningful one to surface. DLQ is flushed first (mirroring the
839    // success path below): its records are only ever written here, whereas the
840    // next run re-reads post-bookmark records from the source.
841    if let Err(e) = loop_result {
842        if let Some(ref dlq_cfg) = dlq
843            && let Err(flush_err) = dlq_cfg.sink.flush().await
844        {
845            tracing::warn!(
846                error = %flush_err,
847                "DLQ sink flush failed during error unwind; original error preserved"
848            );
849        }
850        if let Err(flush_err) = sink.flush().await {
851            tracing::warn!(
852                error = %flush_err,
853                "sink flush failed during error unwind; original error preserved"
854            );
855        }
856        return Err(e);
857    }
858
859    // Flush the DLQ sink BEFORE the main sink so quarantined records are made
860    // durable even if the main sink's final flush fails. The next run will
861    // re-read post-bookmark records from the source and re-route any that
862    // would have fallen out of the main sink's unflushed buffer; DLQ records,
863    // by contrast, are only ever written here and would otherwise be lost.
864    if let Some(ref dlq_cfg) = dlq {
865        let final_metric_labels: Vec<metrics::Label> = vec![
866            metrics::Label::new(
867                "pipeline",
868                metrics::SharedString::from(pipeline_name.clone()),
869            ),
870            metrics::Label::new("row", metrics::SharedString::from(row.clone())),
871            metrics::Label::new(
872                "connector",
873                metrics::SharedString::from(sink_name.to_string()),
874            ),
875            metrics::Label::new(
876                "dlq_connector",
877                metrics::SharedString::from(dlq_sink_name.to_string()),
878            ),
879        ];
880        let _final_dlq_flush_timer = crate::observability::DurationGuard::new(
881            "faucet_sink_dlq_flush_duration_seconds",
882            final_metric_labels.clone(),
883        );
884        dlq_cfg.sink.flush().await.map_err(|e| {
885            let mut lbl = final_metric_labels.clone();
886            lbl.push(metrics::Label::new(
887                "kind",
888                metrics::SharedString::const_str(crate::observability::decorator::error_kind(&e)),
889            ));
890            metrics::counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
891            FaucetError::Sink(format!("DLQ sink flush failed: {e}"))
892        })?;
893    }
894    sink.flush().await?;
895
896    if cancelled {
897        tracing::info!(
898            records_written,
899            "pipeline run cancelled cooperatively; sink flushed (partial output is durable)"
900        );
901    }
902
903    tracing::info!(
904        records_written,
905        cancelled,
906        has_bookmark = last_bookmark.is_some(),
907        persisted = state_store.is_some() && state_key.is_some() && last_bookmark.is_some(),
908        dlq_records = dlq_stats.records_dlq,
909        "pipeline streaming run complete"
910    );
911
912    Ok(PipelineResult {
913        records_written,
914        bookmark: last_bookmark,
915        dlq: dlq.is_some().then_some(dlq_stats),
916    })
917}
918
919/// Emit the adaptive controller's current state + any adjustment as metrics.
920/// Labels are `pipeline,row` only (the controller is pipeline-scoped).
921fn emit_adaptive_metrics(
922    ctrl: &crate::adaptive::AimdController,
923    adj: Option<crate::adaptive::Adjustment>,
924    pipeline: &str,
925    row: &str,
926) {
927    use metrics::{Label, SharedString, counter, gauge};
928    let base = vec![
929        Label::new("pipeline", SharedString::from(pipeline.to_string())),
930        Label::new("row", SharedString::from(row.to_string())),
931    ];
932    gauge!("faucet_pipeline_adaptive_batch_size", base.clone()).set(ctrl.current() as f64);
933    gauge!(
934        "faucet_pipeline_adaptive_batch_cooldown_active",
935        base.clone()
936    )
937    .set(if ctrl.cooldown_active() { 1.0 } else { 0.0 });
938    if let Some(p50) = ctrl.p50_latency_ms() {
939        gauge!(
940            "faucet_pipeline_adaptive_batch_p50_latency_ms",
941            base.clone()
942        )
943        .set(p50 as f64);
944    }
945    if let Some(a) = adj {
946        let mut lbl = base;
947        lbl.push(Label::new(
948            "direction",
949            SharedString::const_str(a.direction.as_str()),
950        ));
951        lbl.push(Label::new(
952            "reason",
953            SharedString::const_str(a.reason.as_str()),
954        ));
955        counter!("faucet_pipeline_adaptive_batch_adjustments_total", lbl).increment(1);
956    }
957}
958
959/// One-shot info when adaptive sizing targets a per-record sink that ignores
960/// `batch_size` (its adjustments are harmless no-ops).
961fn maybe_warn_noop_sink(sink_name: &str, warned: &mut bool) {
962    if !*warned && matches!(sink_name, "jsonl" | "csv" | "stdout") {
963        tracing::info!(
964            sink = sink_name,
965            "adaptive batch sizing is a no-op for this per-record sink"
966        );
967        *warned = true;
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use super::*;
974    use async_trait::async_trait;
975    use serde_json::json;
976
977    // ── Mock Source ──────────────────────────────────────────────────────────
978
979    struct MockSource(Vec<Value>);
980
981    #[async_trait]
982    impl Source for MockSource {
983        async fn fetch_with_context(
984            &self,
985            _context: &std::collections::HashMap<String, Value>,
986        ) -> Result<Vec<Value>, FaucetError> {
987            Ok(self.0.clone())
988        }
989    }
990
991    struct IncrementalSource {
992        records: Vec<Value>,
993        bookmark: Value,
994    }
995
996    #[async_trait]
997    impl Source for IncrementalSource {
998        async fn fetch_with_context(
999            &self,
1000            _context: &std::collections::HashMap<String, Value>,
1001        ) -> Result<Vec<Value>, FaucetError> {
1002            Ok(self.records.clone())
1003        }
1004        async fn fetch_with_context_incremental(
1005            &self,
1006            _context: &std::collections::HashMap<String, Value>,
1007        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
1008            Ok((self.records.clone(), Some(self.bookmark.clone())))
1009        }
1010    }
1011
1012    struct FailingSource;
1013
1014    #[async_trait]
1015    impl Source for FailingSource {
1016        async fn fetch_with_context(
1017            &self,
1018            _context: &std::collections::HashMap<String, Value>,
1019        ) -> Result<Vec<Value>, FaucetError> {
1020            Err(FaucetError::Auth("no credentials".into()))
1021        }
1022    }
1023
1024    // ── Mock Sink ───────────────────────────────────────────────────────────
1025
1026    struct MockSink(std::sync::Mutex<Vec<Value>>);
1027
1028    impl MockSink {
1029        fn new() -> Self {
1030            Self(std::sync::Mutex::new(Vec::new()))
1031        }
1032        fn written(&self) -> Vec<Value> {
1033            self.0.lock().unwrap().clone()
1034        }
1035    }
1036
1037    #[async_trait]
1038    impl Sink for MockSink {
1039        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1040            self.0.lock().unwrap().extend(records.iter().cloned());
1041            Ok(records.len())
1042        }
1043    }
1044
1045    struct FailingSink;
1046
1047    #[async_trait]
1048    impl Sink for FailingSink {
1049        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
1050            Err(FaucetError::Sink("write failed".into()))
1051        }
1052    }
1053
1054    /// Records writes and how many times `flush` was called. Used to assert the
1055    /// pipeline flushes the sink on the error/early-return path so partial
1056    /// output (e.g. a Parquet footer) is made durable before the error
1057    /// propagates.
1058    struct FlushTrackingSink {
1059        written: std::sync::Mutex<Vec<Value>>,
1060        flush_count: std::sync::atomic::AtomicUsize,
1061    }
1062
1063    impl FlushTrackingSink {
1064        fn new() -> Self {
1065            Self {
1066                written: std::sync::Mutex::new(Vec::new()),
1067                flush_count: std::sync::atomic::AtomicUsize::new(0),
1068            }
1069        }
1070        fn written(&self) -> Vec<Value> {
1071            self.written.lock().unwrap().clone()
1072        }
1073        fn flush_count(&self) -> usize {
1074            self.flush_count.load(std::sync::atomic::Ordering::SeqCst)
1075        }
1076    }
1077
1078    #[async_trait]
1079    impl Sink for FlushTrackingSink {
1080        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1081            self.written.lock().unwrap().extend(records.iter().cloned());
1082            Ok(records.len())
1083        }
1084        async fn flush(&self) -> Result<(), FaucetError> {
1085            self.flush_count
1086                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1087            Ok(())
1088        }
1089    }
1090
1091    // ── StreamPage / batch_size tests ───────────────────────────────────────
1092
1093    #[test]
1094    fn stream_page_constructs() {
1095        let page = StreamPage {
1096            records: vec![json!({"id": 1})],
1097            bookmark: Some(json!("2026-05-18")),
1098        };
1099        assert_eq!(page.records.len(), 1);
1100        assert_eq!(page.bookmark, Some(json!("2026-05-18")));
1101    }
1102
1103    #[test]
1104    fn validate_batch_size_accepts_zero_as_no_batching_sentinel() {
1105        // 0 means "do not batch — emit/accept the whole result set in one page".
1106        assert_eq!(validate_batch_size(0).unwrap(), 0);
1107    }
1108
1109    #[test]
1110    fn validate_batch_size_rejects_too_large() {
1111        let err = validate_batch_size(MAX_BATCH_SIZE + 1).unwrap_err();
1112        assert!(matches!(err, FaucetError::Config(_)));
1113    }
1114
1115    #[test]
1116    fn validate_batch_size_accepts_one() {
1117        assert_eq!(validate_batch_size(1).unwrap(), 1);
1118    }
1119
1120    #[test]
1121    fn validate_batch_size_accepts_max() {
1122        assert_eq!(validate_batch_size(MAX_BATCH_SIZE).unwrap(), MAX_BATCH_SIZE);
1123    }
1124
1125    // Compile-time invariant: DEFAULT_BATCH_SIZE must be within [1, MAX_BATCH_SIZE].
1126    const _: () = {
1127        assert!(DEFAULT_BATCH_SIZE >= 1);
1128        assert!(DEFAULT_BATCH_SIZE <= MAX_BATCH_SIZE);
1129    };
1130
1131    // ── Batch mode tests ────────────────────────────────────────────────────
1132
1133    #[tokio::test]
1134    async fn batch_pipeline_writes_all_records() {
1135        let source = MockSource(vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})]);
1136        let sink = MockSink::new();
1137
1138        let result = Pipeline::new(&source, &sink).run().await.unwrap();
1139
1140        assert_eq!(result.records_written, 3);
1141        assert!(result.bookmark.is_none());
1142        assert_eq!(sink.written().len(), 3);
1143    }
1144
1145    #[tokio::test]
1146    async fn batch_pipeline_returns_bookmark() {
1147        let source = IncrementalSource {
1148            records: vec![json!({"id": 1, "ts": "2024-12-01"})],
1149            bookmark: json!("2024-12-01"),
1150        };
1151        let sink = MockSink::new();
1152
1153        let result = Pipeline::new(&source, &sink).run().await.unwrap();
1154
1155        assert_eq!(result.records_written, 1);
1156        assert_eq!(result.bookmark, Some(json!("2024-12-01")));
1157    }
1158
1159    #[tokio::test]
1160    async fn batch_pipeline_empty_source() {
1161        let source = MockSource(vec![]);
1162        let sink = MockSink::new();
1163
1164        let result = Pipeline::new(&source, &sink).run().await.unwrap();
1165
1166        assert_eq!(result.records_written, 0);
1167        assert!(sink.written().is_empty());
1168    }
1169
1170    #[tokio::test]
1171    async fn batch_pipeline_source_error_propagates() {
1172        let source = FailingSource;
1173        let sink = MockSink::new();
1174
1175        let result = Pipeline::new(&source, &sink).run().await;
1176        assert!(result.is_err());
1177        assert!(sink.written().is_empty());
1178    }
1179
1180    #[tokio::test]
1181    async fn batch_pipeline_sink_error_propagates() {
1182        let source = MockSource(vec![json!({"id": 1})]);
1183        let sink = FailingSink;
1184
1185        let result = Pipeline::new(&source, &sink).run().await;
1186        assert!(result.is_err());
1187    }
1188
1189    #[tokio::test]
1190    async fn batch_pipeline_with_trait_objects() {
1191        let source: Box<dyn Source> = Box::new(MockSource(vec![json!({"id": 1})]));
1192        let sink: Box<dyn Sink> = Box::new(MockSink::new());
1193
1194        let result = Pipeline::new(source.as_ref(), sink.as_ref())
1195            .run()
1196            .await
1197            .unwrap();
1198
1199        assert_eq!(result.records_written, 1);
1200    }
1201
1202    // ── Streaming mode tests ────────────────────────────────────────────────
1203
1204    #[tokio::test]
1205    async fn stream_pipeline_writes_pages() {
1206        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1207            Ok(StreamPage {
1208                records: vec![json!({"id": 1}), json!({"id": 2})],
1209                bookmark: None,
1210            }),
1211            Ok(StreamPage {
1212                records: vec![json!({"id": 3})],
1213                bookmark: None,
1214            }),
1215        ];
1216        let stream = futures::stream::iter(pages);
1217        let sink = MockSink::new();
1218
1219        let result = run_stream(stream, &sink, RunStreamOptions::new())
1220            .await
1221            .unwrap();
1222
1223        assert_eq!(result.records_written, 3);
1224        assert!(result.bookmark.is_none());
1225        assert_eq!(sink.written().len(), 3);
1226    }
1227
1228    #[tokio::test]
1229    async fn stream_pipeline_flushes_sink_on_source_error() {
1230        // Regression for #78/#3: a mid-stream source error must not skip the
1231        // sink flush. Without flushing, a buffered sink (e.g. Parquet, whose
1232        // footer is only written on flush) loses everything written so far.
1233        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1234            Ok(StreamPage {
1235                records: vec![json!({"id": 1}), json!({"id": 2})],
1236                bookmark: None,
1237            }),
1238            Err(FaucetError::Source("transient blip mid-stream".into())),
1239        ];
1240        let stream = futures::stream::iter(pages);
1241        let sink = FlushTrackingSink::new();
1242
1243        let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1244
1245        // The original source error must still propagate.
1246        assert!(matches!(result, Err(FaucetError::Source(_))));
1247        // The good page must have been written before the error.
1248        assert_eq!(sink.written().len(), 2);
1249        // Crucially, the sink must have been flushed on the error path.
1250        assert!(
1251            sink.flush_count() >= 1,
1252            "sink must be flushed on the error path so partial output is durable"
1253        );
1254    }
1255
1256    #[tokio::test]
1257    async fn stream_pipeline_flushes_sink_on_cancel() {
1258        // #146 H16: a cooperative cancellation mid-run must stop polling, flush
1259        // the sink (so a Parquet footer / S3 multipart is completed rather than
1260        // orphaned), and return the partial result — NOT drop the run future,
1261        // which would flush nothing.
1262        use tokio_util::sync::CancellationToken;
1263
1264        // One page, then block forever — the only way out is the cancel token.
1265        let stream = Box::pin(async_stream::stream! {
1266            yield Ok(StreamPage {
1267                records: vec![json!({"id": 1}), json!({"id": 2})],
1268                bookmark: None,
1269            });
1270            futures::future::pending::<()>().await;
1271        });
1272        let sink = FlushTrackingSink::new();
1273
1274        let token = CancellationToken::new();
1275        let canceller = token.clone();
1276        tokio::spawn(async move {
1277            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1278            canceller.cancel();
1279        });
1280
1281        let result = run_stream(stream, &sink, RunStreamOptions::new().with_cancel(token))
1282            .await
1283            .expect("a cooperative cancel returns Ok with the partial result");
1284
1285        // The page written before cancellation survives, and the sink was
1286        // flushed so that output is durable.
1287        assert_eq!(result.records_written, 2);
1288        assert_eq!(sink.written().len(), 2);
1289        assert!(
1290            sink.flush_count() >= 1,
1291            "sink must be flushed on the cancel path so partial output is durable"
1292        );
1293    }
1294
1295    #[tokio::test]
1296    async fn stream_pipeline_empty() {
1297        let pages: Vec<Result<StreamPage, FaucetError>> = vec![];
1298        let stream = futures::stream::iter(pages);
1299        let sink = MockSink::new();
1300
1301        let result = run_stream(stream, &sink, RunStreamOptions::new())
1302            .await
1303            .unwrap();
1304
1305        assert_eq!(result.records_written, 0);
1306    }
1307
1308    #[tokio::test]
1309    async fn stream_pipeline_skips_empty_pages() {
1310        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1311            Ok(StreamPage {
1312                records: vec![json!({"id": 1})],
1313                bookmark: None,
1314            }),
1315            Ok(StreamPage {
1316                records: vec![],
1317                bookmark: None,
1318            }),
1319            Ok(StreamPage {
1320                records: vec![json!({"id": 2})],
1321                bookmark: None,
1322            }),
1323        ];
1324        let stream = futures::stream::iter(pages);
1325        let sink = MockSink::new();
1326
1327        let result = run_stream(stream, &sink, RunStreamOptions::new())
1328            .await
1329            .unwrap();
1330
1331        assert_eq!(result.records_written, 2);
1332    }
1333
1334    #[tokio::test]
1335    async fn stream_pipeline_error_in_page_propagates() {
1336        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1337            Ok(StreamPage {
1338                records: vec![json!({"id": 1})],
1339                bookmark: None,
1340            }),
1341            Err(FaucetError::HttpStatus {
1342                status: 500,
1343                url: "https://example.com".into(),
1344                body: "Internal Server Error".into(),
1345            }),
1346        ];
1347        let stream = futures::stream::iter(pages);
1348        let sink = MockSink::new();
1349
1350        let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1351        assert!(result.is_err());
1352        // First page was written before the error
1353        assert_eq!(sink.written().len(), 1);
1354    }
1355
1356    #[tokio::test]
1357    async fn stream_pipeline_sink_error_propagates() {
1358        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1359            records: vec![json!({"id": 1})],
1360            bookmark: None,
1361        })];
1362        let stream = futures::stream::iter(pages);
1363        let sink = FailingSink;
1364
1365        let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1366        assert!(result.is_err());
1367    }
1368
1369    #[tokio::test]
1370    async fn stream_pipeline_with_trait_object_sink() {
1371        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1372            records: vec![json!({"id": 1})],
1373            bookmark: None,
1374        })];
1375        let stream = futures::stream::iter(pages);
1376        let sink: Box<dyn Sink> = Box::new(MockSink::new());
1377
1378        let result = run_stream(stream, sink.as_ref(), RunStreamOptions::new())
1379            .await
1380            .unwrap();
1381        assert_eq!(result.records_written, 1);
1382    }
1383
1384    #[tokio::test]
1385    async fn stream_pipeline_persists_bookmark_when_page_carries_one() {
1386        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1387        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1388            Ok(StreamPage {
1389                records: vec![json!({"id": 1})],
1390                bookmark: None,
1391            }),
1392            Ok(StreamPage {
1393                records: vec![json!({"id": 2})],
1394                bookmark: Some(json!("checkpoint-final")),
1395            }),
1396        ];
1397        let stream = futures::stream::iter(pages);
1398        let sink = MockSink::new();
1399
1400        let result = run_stream(
1401            stream,
1402            &sink,
1403            RunStreamOptions::new().with_state(Arc::clone(&store), "k"),
1404        )
1405        .await
1406        .unwrap();
1407
1408        assert_eq!(result.records_written, 2);
1409        assert_eq!(result.bookmark, Some(json!("checkpoint-final")));
1410        assert_eq!(
1411            store.get("k").await.unwrap(),
1412            Some(json!("checkpoint-final"))
1413        );
1414    }
1415
1416    #[tokio::test]
1417    async fn stream_pipeline_persists_per_page_bookmarks() {
1418        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1419        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1420            Ok(StreamPage {
1421                records: vec![json!({"id": 1})],
1422                bookmark: Some(json!("tx-1")),
1423            }),
1424            Ok(StreamPage {
1425                records: vec![json!({"id": 2})],
1426                bookmark: Some(json!("tx-2")),
1427            }),
1428        ];
1429        let stream = futures::stream::iter(pages);
1430        let sink = MockSink::new();
1431
1432        run_stream(
1433            stream,
1434            &sink,
1435            RunStreamOptions::new().with_state(Arc::clone(&store), "k"),
1436        )
1437        .await
1438        .unwrap();
1439
1440        // Latest per-page bookmark wins.
1441        assert_eq!(store.get("k").await.unwrap(), Some(json!("tx-2")));
1442    }
1443
1444    // ── State-store integration tests ───────────────────────────────────────
1445
1446    use crate::state::{FileStateStore, MemoryStateStore, StateStore};
1447    use std::sync::Arc;
1448    use tempfile::TempDir;
1449
1450    /// Source that opts into state persistence. It records the bookmark it
1451    /// received via `apply_start_bookmark` so tests can verify the pipeline
1452    /// pushed the stored value back into it on resume.
1453    struct StatefulSource {
1454        key: String,
1455        records: Vec<Value>,
1456        new_bookmark: Value,
1457        seen_bookmark: std::sync::Mutex<Option<Value>>,
1458    }
1459
1460    impl StatefulSource {
1461        fn new(key: &str, records: Vec<Value>, new_bookmark: Value) -> Self {
1462            Self {
1463                key: key.into(),
1464                records,
1465                new_bookmark,
1466                seen_bookmark: std::sync::Mutex::new(None),
1467            }
1468        }
1469        fn observed_start(&self) -> Option<Value> {
1470            self.seen_bookmark.lock().unwrap().clone()
1471        }
1472    }
1473
1474    #[async_trait]
1475    impl Source for StatefulSource {
1476        async fn fetch_with_context(
1477            &self,
1478            _ctx: &std::collections::HashMap<String, Value>,
1479        ) -> Result<Vec<Value>, FaucetError> {
1480            Ok(self.records.clone())
1481        }
1482        async fn fetch_with_context_incremental(
1483            &self,
1484            _ctx: &std::collections::HashMap<String, Value>,
1485        ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
1486            Ok((self.records.clone(), Some(self.new_bookmark.clone())))
1487        }
1488        fn state_key(&self) -> Option<String> {
1489            Some(self.key.clone())
1490        }
1491        async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
1492            *self.seen_bookmark.lock().unwrap() = Some(bookmark);
1493            Ok(())
1494        }
1495    }
1496
1497    #[tokio::test]
1498    async fn pipeline_with_state_store_persists_bookmark_after_sink() {
1499        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1500        let source = StatefulSource::new(
1501            "github_issues",
1502            vec![json!({"id": 1, "ts": "2026-05-01"})],
1503            json!("2026-05-01"),
1504        );
1505        let sink = MockSink::new();
1506        let result = Pipeline::new(&source, &sink)
1507            .with_state_store(Arc::clone(&store))
1508            .run()
1509            .await
1510            .unwrap();
1511
1512        assert_eq!(result.records_written, 1);
1513        assert_eq!(result.bookmark, Some(json!("2026-05-01")));
1514        // Stored value matches what the source returned.
1515        let stored = store.get("github_issues").await.unwrap();
1516        assert_eq!(stored, Some(json!("2026-05-01")));
1517    }
1518
1519    #[tokio::test]
1520    async fn pipeline_with_state_store_resumes_from_stored_bookmark() {
1521        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1522        store
1523            .put("github_issues", &json!("2026-04-30"))
1524            .await
1525            .unwrap();
1526
1527        let source =
1528            StatefulSource::new("github_issues", vec![json!({"id": 2})], json!("2026-05-01"));
1529        let sink = MockSink::new();
1530        Pipeline::new(&source, &sink)
1531            .with_state_store(Arc::clone(&store))
1532            .run()
1533            .await
1534            .unwrap();
1535
1536        // The pipeline pushed the previously-stored bookmark back into the source.
1537        assert_eq!(source.observed_start(), Some(json!("2026-04-30")));
1538        // And then overwrote it with the new value from this run.
1539        assert_eq!(
1540            store.get("github_issues").await.unwrap(),
1541            Some(json!("2026-05-01"))
1542        );
1543    }
1544
1545    #[tokio::test]
1546    async fn pipeline_with_state_store_does_not_persist_when_sink_fails() {
1547        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1548        let source = StatefulSource::new("k", vec![json!({"id": 1})], json!("2026-05-01"));
1549        let sink = FailingSink;
1550
1551        let result = Pipeline::new(&source, &sink)
1552            .with_state_store(Arc::clone(&store))
1553            .run()
1554            .await;
1555        assert!(result.is_err());
1556        assert!(store.get("k").await.unwrap().is_none());
1557    }
1558
1559    #[tokio::test]
1560    async fn pipeline_with_state_store_no_state_key_means_no_persist() {
1561        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1562        let source = IncrementalSource {
1563            records: vec![json!({"id": 1})],
1564            bookmark: json!("ignored"),
1565        };
1566        let sink = MockSink::new();
1567        Pipeline::new(&source, &sink)
1568            .with_state_store(Arc::clone(&store))
1569            .run()
1570            .await
1571            .unwrap();
1572        // IncrementalSource doesn't override state_key, so nothing was persisted.
1573        // Cross-check that no keys exist by trying a likely one.
1574        assert!(store.get("anything").await.unwrap().is_none());
1575    }
1576
1577    #[tokio::test]
1578    async fn pipeline_with_state_store_skips_persist_when_bookmark_is_none() {
1579        let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1580        struct NoBookmarkSource;
1581        #[async_trait]
1582        impl Source for NoBookmarkSource {
1583            async fn fetch_with_context(
1584                &self,
1585                _ctx: &std::collections::HashMap<String, Value>,
1586            ) -> Result<Vec<Value>, FaucetError> {
1587                Ok(vec![json!({"id": 1})])
1588            }
1589            fn state_key(&self) -> Option<String> {
1590                Some("k".into())
1591            }
1592        }
1593        let source = NoBookmarkSource;
1594        let sink = MockSink::new();
1595        Pipeline::new(&source, &sink)
1596            .with_state_store(Arc::clone(&store))
1597            .run()
1598            .await
1599            .unwrap();
1600        assert!(store.get("k").await.unwrap().is_none());
1601    }
1602
1603    // ── Pipeline::run drives stream_pages ──────────────────────────────────
1604
1605    /// A source with a custom `stream_pages` impl that yields three pages.
1606    /// Used to prove `Pipeline::run` drives the streaming path.
1607    struct PagedSource;
1608
1609    #[async_trait]
1610    impl Source for PagedSource {
1611        async fn fetch_with_context(
1612            &self,
1613            _ctx: &std::collections::HashMap<String, Value>,
1614        ) -> Result<Vec<Value>, FaucetError> {
1615            // Should never be called when stream_pages is overridden.
1616            unreachable!("Pipeline::run must drive stream_pages, not fetch_with_context");
1617        }
1618        fn stream_pages<'a>(
1619            &'a self,
1620            _ctx: &'a std::collections::HashMap<String, Value>,
1621            _batch_size: usize,
1622        ) -> std::pin::Pin<
1623            Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>,
1624        > {
1625            Box::pin(async_stream::try_stream! {
1626                yield StreamPage { records: vec![json!({"i": 1})], bookmark: None };
1627                yield StreamPage { records: vec![json!({"i": 2})], bookmark: None };
1628                yield StreamPage { records: vec![json!({"i": 3})], bookmark: Some(json!("final")) };
1629            })
1630        }
1631    }
1632
1633    /// Sink that counts how many distinct write_batch calls happen.
1634    struct CountingSink {
1635        calls: std::sync::Mutex<Vec<usize>>,
1636    }
1637
1638    impl CountingSink {
1639        fn new() -> Self {
1640            Self {
1641                calls: std::sync::Mutex::new(Vec::new()),
1642            }
1643        }
1644        fn call_count(&self) -> usize {
1645            self.calls.lock().unwrap().len()
1646        }
1647    }
1648
1649    #[async_trait]
1650    impl Sink for CountingSink {
1651        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1652            self.calls.lock().unwrap().push(records.len());
1653            Ok(records.len())
1654        }
1655    }
1656
1657    #[tokio::test]
1658    async fn pipeline_run_drives_stream_pages() {
1659        let source = PagedSource;
1660        let sink = CountingSink::new();
1661
1662        let result = Pipeline::new(&source, &sink).run().await.unwrap();
1663
1664        // Three pages of one record each → three sink calls, three records.
1665        assert_eq!(sink.call_count(), 3);
1666        assert_eq!(result.records_written, 3);
1667        assert_eq!(result.bookmark, Some(json!("final")));
1668    }
1669
1670    #[tokio::test]
1671    async fn pipeline_with_file_state_store_round_trips_across_runs() {
1672        let dir = TempDir::new().unwrap();
1673        let store: Arc<dyn StateStore> = Arc::new(FileStateStore::new(dir.path()));
1674
1675        // Run 1: nothing stored yet, persist new bookmark.
1676        let s1 = StatefulSource::new("k", vec![json!({"i": 1})], json!("v1"));
1677        let sink1 = MockSink::new();
1678        Pipeline::new(&s1, &sink1)
1679            .with_state_store(Arc::clone(&store))
1680            .run()
1681            .await
1682            .unwrap();
1683        assert_eq!(s1.observed_start(), None);
1684        assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
1685
1686        // Run 2: resume from v1, persist v2.
1687        let s2 = StatefulSource::new("k", vec![json!({"i": 2})], json!("v2"));
1688        let sink2 = MockSink::new();
1689        Pipeline::new(&s2, &sink2)
1690            .with_state_store(Arc::clone(&store))
1691            .run()
1692            .await
1693            .unwrap();
1694        assert_eq!(s2.observed_start(), Some(json!("v1")));
1695        assert_eq!(store.get("k").await.unwrap(), Some(json!("v2")));
1696    }
1697
1698    #[tokio::test]
1699    #[allow(clippy::await_holding_lock)]
1700    async fn pipeline_run_increments_runs_total() {
1701        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1702        use metrics_util::debugging::DebugValue;
1703        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1704        let snap = snapshotter();
1705
1706        let source = MockSource(vec![json!({"i": 1})]);
1707        let sink = MockSink::new();
1708        let _ = Pipeline::new(&source, &sink)
1709            .with_name("test-pipeline")
1710            .with_row("rowA")
1711            .run()
1712            .await
1713            .unwrap();
1714
1715        let snapshot = snap.snapshot();
1716        let found = snapshot.into_vec().into_iter().any(
1717            |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1718                key.key().name() == "faucet_pipeline_runs_total"
1719                    && key.key().labels().any(|l: &metrics::Label| {
1720                        l.key() == "pipeline" && l.value() == "test-pipeline"
1721                    })
1722                    && key
1723                        .key()
1724                        .labels()
1725                        .any(|l: &metrics::Label| l.key() == "row" && l.value() == "rowA")
1726                    && key
1727                        .key()
1728                        .labels()
1729                        .any(|l: &metrics::Label| l.key() == "status" && l.value() == "ok")
1730                    && matches!(v, DebugValue::Counter(c) if c >= 1)
1731            },
1732        );
1733        assert!(
1734            found,
1735            "expected faucet_pipeline_runs_total{{pipeline=test-pipeline, row=rowA, status=ok}}"
1736        );
1737    }
1738
1739    #[tokio::test]
1740    #[allow(clippy::await_holding_lock)]
1741    async fn pipeline_failure_attaches_kind_label_to_runs_total() {
1742        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1743        use metrics_util::debugging::DebugValue;
1744        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1745        let snap = snapshotter();
1746
1747        let source = FailingSource;
1748        let sink = MockSink::new();
1749        let _ = Pipeline::new(&source, &sink)
1750            .with_name("err-pipeline")
1751            .with_row("rowE")
1752            .run()
1753            .await;
1754
1755        let snapshot = snap.snapshot();
1756        let found = snapshot.into_vec().into_iter().any(
1757            |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1758                key.key().name() == "faucet_pipeline_runs_total"
1759                    && key.key().labels().any(|l: &metrics::Label| {
1760                        l.key() == "pipeline" && l.value() == "err-pipeline"
1761                    })
1762                    && key
1763                        .key()
1764                        .labels()
1765                        .any(|l: &metrics::Label| l.key() == "status" && l.value() == "err")
1766                    && key
1767                        .key()
1768                        .labels()
1769                        .any(|l: &metrics::Label| l.key() == "kind" && l.value() == "Auth")
1770                    && matches!(v, DebugValue::Counter(c) if c >= 1)
1771            },
1772        );
1773        assert!(
1774            found,
1775            "expected faucet_pipeline_runs_total{{status=err, kind=Auth}} for failing source"
1776        );
1777    }
1778
1779    #[tokio::test]
1780    #[allow(clippy::await_holding_lock)]
1781    async fn pipeline_run_emits_start_time_gauge() {
1782        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1783        use metrics_util::debugging::DebugValue;
1784        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1785        let snap = snapshotter();
1786
1787        let source = MockSource(vec![json!({"i": 1})]);
1788        let sink = MockSink::new();
1789        let before = std::time::SystemTime::now()
1790            .duration_since(std::time::UNIX_EPOCH)
1791            .map(|d| d.as_secs_f64())
1792            .unwrap_or(0.0);
1793        let _ = Pipeline::new(&source, &sink)
1794            .with_name("start-time-pipeline")
1795            .with_row("rowS")
1796            .run()
1797            .await
1798            .unwrap();
1799
1800        let snapshot = snap.snapshot();
1801        let found = snapshot.into_vec().into_iter().any(
1802            |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1803                if key.key().name() != "faucet_pipeline_start_time_unix_seconds" {
1804                    return false;
1805                }
1806                let labels_match = key.key().labels().any(|l: &metrics::Label| {
1807                    l.key() == "pipeline" && l.value() == "start-time-pipeline"
1808                }) && key
1809                    .key()
1810                    .labels()
1811                    .any(|l: &metrics::Label| l.key() == "row" && l.value() == "rowS");
1812                if !labels_match {
1813                    return false;
1814                }
1815                matches!(v, DebugValue::Gauge(g) if g.into_inner() >= before)
1816            },
1817        );
1818        assert!(
1819            found,
1820            "expected faucet_pipeline_start_time_unix_seconds gauge >= test-start timestamp"
1821        );
1822    }
1823
1824    #[tokio::test]
1825    #[allow(clippy::await_holding_lock)]
1826    async fn register_build_info_sets_version_gauge() {
1827        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1828        use metrics_util::debugging::DebugValue;
1829        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1830        let snap = snapshotter();
1831
1832        crate::observability::register_build_info();
1833
1834        let snapshot = snap.snapshot();
1835        let found = snapshot.into_vec().into_iter().any(
1836            |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1837                key.key().name() == "faucet_build_info"
1838                    && key.key().labels().any(|l: &metrics::Label| {
1839                        l.key() == "version" && l.value() == env!("CARGO_PKG_VERSION")
1840                    })
1841                    && matches!(v, DebugValue::Gauge(g) if (g.into_inner() - 1.0).abs() < f64::EPSILON)
1842            },
1843        );
1844        assert!(
1845            found,
1846            "expected faucet_build_info{{version=CARGO_PKG_VERSION}} = 1.0 after register_build_info()"
1847        );
1848    }
1849
1850    // ── DLQ routing tests ──────────────────────────────────────────────────
1851
1852    use crate::dlq::{DlqConfig, OnBatchError};
1853
1854    /// Sink that returns mixed per-row outcomes: failure indices come from
1855    /// the constructor; everything else succeeds. Captures the rows that
1856    /// *would* have committed to the main sink.
1857    struct PartialSink {
1858        fail_indices: std::sync::Mutex<Vec<usize>>,
1859        committed: std::sync::Mutex<Vec<Value>>,
1860    }
1861
1862    impl PartialSink {
1863        fn new(fail_indices: Vec<usize>) -> Self {
1864            Self {
1865                fail_indices: std::sync::Mutex::new(fail_indices),
1866                committed: std::sync::Mutex::new(Vec::new()),
1867            }
1868        }
1869    }
1870
1871    #[async_trait]
1872    impl Sink for PartialSink {
1873        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
1874            unreachable!("PartialSink only overrides write_batch_partial");
1875        }
1876        async fn write_batch_partial(
1877            &self,
1878            records: &[Value],
1879        ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
1880            let fails: std::collections::HashSet<usize> =
1881                self.fail_indices.lock().unwrap().iter().copied().collect();
1882            let mut outcomes = Vec::with_capacity(records.len());
1883            for (i, rec) in records.iter().enumerate() {
1884                if fails.contains(&i) {
1885                    outcomes.push(Err(FaucetError::Sink(format!("row {i} rejected"))));
1886                } else {
1887                    self.committed.lock().unwrap().push(rec.clone());
1888                    outcomes.push(Ok(()));
1889                }
1890            }
1891            Ok(outcomes)
1892        }
1893    }
1894
1895    #[tokio::test]
1896    async fn dlq_routes_only_failed_rows_for_partial_success_sink() {
1897        let main = PartialSink::new(vec![1, 3]); // 4 rows, indices 1 and 3 fail
1898        let dlq = std::sync::Arc::new(MockSink::new());
1899        let dlq_cfg = DlqConfig::new(dlq.clone());
1900
1901        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1902            records: (0..4).map(|i| json!({"i": i})).collect(),
1903            bookmark: None,
1904        })];
1905        let stream = futures::stream::iter(pages);
1906        let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg))
1907            .await
1908            .unwrap();
1909
1910        assert_eq!(result.records_written, 2); // 0 and 2 committed
1911        assert_eq!(main.committed.lock().unwrap().len(), 2);
1912        let envelopes = dlq.0.lock().unwrap();
1913        assert_eq!(envelopes.len(), 2);
1914        assert_eq!(envelopes[0]["payload"]["i"], 1);
1915        assert_eq!(envelopes[0]["record_index"], 1);
1916        assert_eq!(envelopes[1]["payload"]["i"], 3);
1917        assert_eq!(envelopes[1]["record_index"], 3);
1918        let stats = result.dlq.unwrap();
1919        assert_eq!(stats.records_dlq, 2);
1920        assert_eq!(stats.pages_with_failures, 1);
1921    }
1922
1923    #[tokio::test]
1924    async fn dlq_propagate_policy_bubbles_outer_err() {
1925        let main = FailingSink;
1926        let dlq = std::sync::Arc::new(MockSink::new());
1927        let mut dlq_cfg = DlqConfig::new(dlq.clone());
1928        dlq_cfg.on_batch_error = OnBatchError::Propagate;
1929
1930        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1931            records: vec![json!({"i": 0}), json!({"i": 1})],
1932            bookmark: Some(json!("v1")),
1933        })];
1934        let stream = futures::stream::iter(pages);
1935        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
1936        let result = run_stream(
1937            stream,
1938            &main,
1939            RunStreamOptions::new()
1940                .with_dlq(dlq_cfg)
1941                .with_state(std::sync::Arc::clone(&store), "k"),
1942        )
1943        .await;
1944        assert!(matches!(result, Err(FaucetError::Sink(_))));
1945        assert!(dlq.0.lock().unwrap().is_empty());
1946        // Bookmark must NOT be persisted on a propagated failure.
1947        assert!(store.get("k").await.unwrap().is_none());
1948    }
1949
1950    #[tokio::test]
1951    async fn dlq_dlq_all_policy_routes_every_row_on_outer_err() {
1952        let main = FailingSink;
1953        let dlq = std::sync::Arc::new(MockSink::new());
1954        let mut dlq_cfg = DlqConfig::new(dlq.clone());
1955        dlq_cfg.on_batch_error = OnBatchError::DlqAll;
1956
1957        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1958            records: vec![json!({"i": 0}), json!({"i": 1}), json!({"i": 2})],
1959            bookmark: Some(json!("v1")),
1960        })];
1961        let stream = futures::stream::iter(pages);
1962        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
1963        let result = run_stream(
1964            stream,
1965            &main,
1966            RunStreamOptions::new()
1967                .with_dlq(dlq_cfg)
1968                .with_state(std::sync::Arc::clone(&store), "k"),
1969        )
1970        .await
1971        .unwrap();
1972        assert_eq!(result.records_written, 0);
1973        {
1974            let envelopes = dlq.0.lock().unwrap();
1975            assert_eq!(envelopes.len(), 3);
1976            // Every envelope's error.message includes the underlying message.
1977            for env in envelopes.iter() {
1978                let msg = env["error"]["message"].as_str().unwrap();
1979                assert!(msg.contains("write failed"), "got: {msg}");
1980            }
1981        }
1982        assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
1983        assert_eq!(result.dlq.unwrap().records_dlq, 3);
1984    }
1985
1986    #[tokio::test]
1987    async fn dlq_per_page_budget_exceeded_aborts() {
1988        let main = PartialSink::new(vec![0, 1, 2]);
1989        let dlq = std::sync::Arc::new(MockSink::new());
1990        let mut dlq_cfg = DlqConfig::new(dlq.clone());
1991        dlq_cfg.max_failures_per_page = Some(2);
1992
1993        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1994            records: (0..3).map(|i| json!({"i": i})).collect(),
1995            bookmark: None,
1996        })];
1997        let stream = futures::stream::iter(pages);
1998        let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
1999        assert!(
2000            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("per-page budget exceeded")),
2001            "got: {result:?}"
2002        );
2003    }
2004
2005    #[tokio::test]
2006    async fn dlq_total_budget_exceeded_aborts_on_later_page() {
2007        let pages: Vec<Result<StreamPage, FaucetError>> = vec![
2008            Ok(StreamPage {
2009                records: (0..3).map(|i| json!({"i": i})).collect(),
2010                bookmark: None,
2011            }),
2012            Ok(StreamPage {
2013                records: (3..6).map(|i| json!({"i": i})).collect(),
2014                bookmark: None,
2015            }),
2016        ];
2017        // Fail every row across both pages.
2018        let main = PartialSink::new(vec![0, 1, 2]); // applied per page
2019        let dlq = std::sync::Arc::new(MockSink::new());
2020        let mut dlq_cfg = DlqConfig::new(dlq.clone());
2021        dlq_cfg.max_failures_total = Some(4);
2022
2023        let stream = futures::stream::iter(pages);
2024        let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
2025        assert!(
2026            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("total budget exceeded")),
2027            "got: {result:?}"
2028        );
2029    }
2030
2031    #[tokio::test]
2032    async fn dlq_per_page_budget_exceeded_commits_page_before_aborting() {
2033        // M4 (#146): write_batch_partial already commits the survivors to the
2034        // main sink. When the per-page budget then trips, the run must finish
2035        // committing the page — route its failures to the DLQ and persist the
2036        // bookmark — BEFORE aborting, so the committed survivors do NOT
2037        // re-deliver on the next run and the failed rows are not lost.
2038        let main = PartialSink::new(vec![1, 2]); // rows 1,2 fail; row 0 commits
2039        let dlq = std::sync::Arc::new(MockSink::new());
2040        let mut dlq_cfg = DlqConfig::new(dlq.clone());
2041        dlq_cfg.max_failures_per_page = Some(1); // 2 failures > 1 → trips
2042
2043        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2044        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2045            records: (0..3).map(|i| json!({ "i": i })).collect(),
2046            bookmark: Some(json!("v1")),
2047        })];
2048        let stream = futures::stream::iter(pages);
2049        let result = run_stream(
2050            stream,
2051            &main,
2052            RunStreamOptions::new()
2053                .with_dlq(dlq_cfg)
2054                .with_state(std::sync::Arc::clone(&store), "k"),
2055        )
2056        .await;
2057
2058        // Run still aborts with the budget error.
2059        assert!(
2060            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("per-page budget exceeded")),
2061            "got: {result:?}"
2062        );
2063        // The survivor (row 0) was committed to the main sink.
2064        assert_eq!(main.committed.lock().unwrap().len(), 1);
2065        // The two failures were routed to the DLQ (not lost on abort).
2066        assert_eq!(dlq.0.lock().unwrap().len(), 2);
2067        // The bookmark was persisted, so the survivor will NOT re-deliver.
2068        assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2069    }
2070
2071    #[tokio::test]
2072    async fn dlq_total_budget_exceeded_commits_tripping_page_before_aborting() {
2073        // M4 (#146): same guarantee for the cumulative total budget — the page
2074        // that crosses the threshold is committed fully (failures→DLQ, bookmark
2075        // persisted) before the run aborts.
2076        let main = PartialSink::new(vec![1, 2]); // rows 1,2 fail; row 0 commits
2077        let dlq = std::sync::Arc::new(MockSink::new());
2078        let mut dlq_cfg = DlqConfig::new(dlq.clone());
2079        dlq_cfg.max_failures_total = Some(1); // 2 failures > 1 → trips
2080
2081        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2082        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2083            records: (0..3).map(|i| json!({ "i": i })).collect(),
2084            bookmark: Some(json!("v1")),
2085        })];
2086        let stream = futures::stream::iter(pages);
2087        let result = run_stream(
2088            stream,
2089            &main,
2090            RunStreamOptions::new()
2091                .with_dlq(dlq_cfg)
2092                .with_state(std::sync::Arc::clone(&store), "k"),
2093        )
2094        .await;
2095
2096        assert!(
2097            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("total budget exceeded")),
2098            "got: {result:?}"
2099        );
2100        assert_eq!(main.committed.lock().unwrap().len(), 1);
2101        assert_eq!(dlq.0.lock().unwrap().len(), 2);
2102        assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2103    }
2104
2105    /// DLQ sink that always fails. Used to assert the router does not
2106    /// recurse into itself.
2107    struct FailingDlqSink;
2108    #[async_trait]
2109    impl Sink for FailingDlqSink {
2110        async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
2111            Err(FaucetError::Sink("dlq disk full".into()))
2112        }
2113    }
2114
2115    /// DLQ sink that succeeds on write but fails on flush. Used to assert
2116    /// the router wraps DLQ flush errors and bails without persisting the
2117    /// bookmark.
2118    struct FailingFlushDlqSink {
2119        written: std::sync::Mutex<Vec<Value>>,
2120    }
2121    impl FailingFlushDlqSink {
2122        fn new() -> Self {
2123            Self {
2124                written: std::sync::Mutex::new(Vec::new()),
2125            }
2126        }
2127    }
2128    #[async_trait]
2129    impl Sink for FailingFlushDlqSink {
2130        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2131            self.written.lock().unwrap().extend(records.iter().cloned());
2132            Ok(records.len())
2133        }
2134        async fn flush(&self) -> Result<(), FaucetError> {
2135            Err(FaucetError::Sink("dlq flush failed".into()))
2136        }
2137    }
2138
2139    #[tokio::test]
2140    async fn dlq_sink_failure_is_fatal_no_recursion() {
2141        let main = PartialSink::new(vec![0]);
2142        let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingDlqSink);
2143        let dlq_cfg = DlqConfig::new(dlq);
2144
2145        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2146            records: vec![json!({"i": 0}), json!({"i": 1})],
2147            bookmark: Some(json!("v1")),
2148        })];
2149        let stream = futures::stream::iter(pages);
2150        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2151        let result = run_stream(
2152            stream,
2153            &main,
2154            RunStreamOptions::new()
2155                .with_dlq(dlq_cfg)
2156                .with_state(std::sync::Arc::clone(&store), "k"),
2157        )
2158        .await;
2159        assert!(
2160            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink write failed")),
2161            "got: {result:?}"
2162        );
2163        assert!(store.get("k").await.unwrap().is_none());
2164    }
2165
2166    #[tokio::test]
2167    async fn dlq_bookmark_advances_only_after_both_flushes() {
2168        let main = PartialSink::new(vec![1]); // row 1 fails, row 0 commits
2169        let dlq = std::sync::Arc::new(MockSink::new());
2170        let dlq_cfg = DlqConfig::new(dlq.clone());
2171
2172        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2173        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2174            records: vec![json!({"i": 0}), json!({"i": 1})],
2175            bookmark: Some(json!("v1")),
2176        })];
2177        let stream = futures::stream::iter(pages);
2178        run_stream(
2179            stream,
2180            &main,
2181            RunStreamOptions::new()
2182                .with_dlq(dlq_cfg)
2183                .with_state(std::sync::Arc::clone(&store), "k"),
2184        )
2185        .await
2186        .unwrap();
2187        assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2188        assert_eq!(dlq.0.lock().unwrap().len(), 1);
2189        assert_eq!(main.committed.lock().unwrap().len(), 1);
2190    }
2191
2192    #[tokio::test]
2193    async fn dlq_disabled_pipeline_behaves_identically_to_today() {
2194        // Regression guard: omitting DLQ keeps existing behavior bit-identical.
2195        let main = MockSink::new();
2196        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2197            records: vec![json!({"i": 0}), json!({"i": 1})],
2198            bookmark: None,
2199        })];
2200        let stream = futures::stream::iter(pages);
2201        let result = run_stream(stream, &main, RunStreamOptions::new())
2202            .await
2203            .unwrap();
2204        assert_eq!(result.records_written, 2);
2205        assert!(result.dlq.is_none());
2206    }
2207
2208    #[tokio::test]
2209    async fn dlq_per_page_flush_failure_is_fatal_and_blocks_bookmark() {
2210        // Per-page flush path: page carries a bookmark, row 1 fails, the
2211        // DLQ write succeeds but the DLQ flush at the bookmark gate errors.
2212        // The pipeline must bail with "DLQ sink flush failed" and the
2213        // bookmark must NOT be persisted.
2214        let main = PartialSink::new(vec![1]);
2215        let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingFlushDlqSink::new());
2216        let dlq_cfg = DlqConfig::new(dlq);
2217
2218        let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2219        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2220            records: vec![json!({"i": 0}), json!({"i": 1})],
2221            bookmark: Some(json!("v1")),
2222        })];
2223        let stream = futures::stream::iter(pages);
2224        let result = run_stream(
2225            stream,
2226            &main,
2227            RunStreamOptions::new()
2228                .with_dlq(dlq_cfg)
2229                .with_state(std::sync::Arc::clone(&store), "k"),
2230        )
2231        .await;
2232        assert!(
2233            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink flush failed")),
2234            "got: {result:?}"
2235        );
2236        assert!(store.get("k").await.unwrap().is_none());
2237    }
2238
2239    #[tokio::test]
2240    async fn dlq_end_of_stream_flush_failure_is_fatal() {
2241        // End-of-stream flush path: no page carries a bookmark, but DLQ
2242        // received envelopes during the run. The final post-loop flush of
2243        // the DLQ sink errors. The pipeline must bail with "DLQ sink flush
2244        // failed".
2245        let main = PartialSink::new(vec![1]);
2246        let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingFlushDlqSink::new());
2247        let dlq_cfg = DlqConfig::new(dlq);
2248
2249        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2250            records: vec![json!({"i": 0}), json!({"i": 1})],
2251            bookmark: None,
2252        })];
2253        let stream = futures::stream::iter(pages);
2254        let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
2255        assert!(
2256            matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink flush failed")),
2257            "got: {result:?}"
2258        );
2259    }
2260
2261    #[tokio::test]
2262    #[allow(clippy::await_holding_lock)]
2263    async fn dlq_emits_records_total_and_pages_total() {
2264        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2265        use metrics_util::debugging::DebugValue;
2266
2267        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2268        let snap = snapshotter();
2269
2270        let source = MockSource(vec![json!({"i": 0}), json!({"i": 1})]);
2271        let main = PartialSink::new(vec![1]);
2272        let dlq = std::sync::Arc::new(MockSink::new());
2273        let _ = Pipeline::new(&source, &main)
2274            .with_name("p_dlq_metrics")
2275            .with_row("r1")
2276            .with_dlq(DlqConfig::new(dlq.clone()))
2277            .run()
2278            .await
2279            .unwrap();
2280
2281        let snapshot = snap.snapshot();
2282        let mut saw_records = false;
2283        let mut saw_pages = false;
2284        for (k, _u, _d, v) in snapshot.into_vec() {
2285            let key = k.key();
2286            let labels = key.labels().collect::<Vec<_>>();
2287            let has = |k: &str, v: &str| labels.iter().any(|l| l.key() == k && l.value() == v);
2288            if key.name() == "faucet_sink_dlq_records_total"
2289                && has("pipeline", "p_dlq_metrics")
2290                && has("row", "r1")
2291                && matches!(v, DebugValue::Counter(c) if c >= 1)
2292            {
2293                saw_records = true;
2294            }
2295            if key.name() == "faucet_sink_dlq_pages_total"
2296                && has("pipeline", "p_dlq_metrics")
2297                && matches!(v, DebugValue::Counter(c) if c >= 1)
2298            {
2299                saw_pages = true;
2300            }
2301        }
2302        assert!(saw_records, "faucet_sink_dlq_records_total not emitted");
2303        assert!(saw_pages, "faucet_sink_dlq_pages_total not emitted");
2304    }
2305
2306    #[tokio::test]
2307    #[allow(clippy::await_holding_lock)]
2308    async fn dlq_budget_exceeded_emits_counter() {
2309        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2310        use metrics_util::debugging::DebugValue;
2311
2312        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2313        let snap = snapshotter();
2314
2315        let source = MockSource((0..3).map(|i| json!({"i": i})).collect());
2316        let main = PartialSink::new(vec![0, 1, 2]);
2317        let dlq = std::sync::Arc::new(MockSink::new());
2318        let mut cfg = DlqConfig::new(dlq);
2319        cfg.max_failures_per_page = Some(1);
2320        let _ = Pipeline::new(&source, &main)
2321            .with_name("p_budget")
2322            .with_dlq(cfg)
2323            .run()
2324            .await;
2325
2326        let snapshot = snap.snapshot();
2327        let saw = snapshot.into_vec().into_iter().any(|(k, _, _, v)| {
2328            k.key().name() == "faucet_sink_dlq_budget_exceeded_total"
2329                && k.key()
2330                    .labels()
2331                    .any(|l| l.key() == "scope" && l.value() == "per_page")
2332                && matches!(v, DebugValue::Counter(c) if c >= 1)
2333        });
2334        assert!(saw, "faucet_sink_dlq_budget_exceeded_total not emitted");
2335    }
2336
2337    #[tokio::test]
2338    async fn pipeline_run_with_dlq_routes_partial_failures_end_to_end() {
2339        // Source: 3 records. Main sink: fails index 1. DLQ: in-memory.
2340        let source = MockSource(vec![json!({"i": 0}), json!({"i": 1}), json!({"i": 2})]);
2341        let main = PartialSink::new(vec![1]);
2342        let dlq = std::sync::Arc::new(MockSink::new());
2343
2344        let result = Pipeline::new(&source, &main)
2345            .with_dlq(DlqConfig::new(dlq.clone()))
2346            .run()
2347            .await
2348            .unwrap();
2349
2350        assert_eq!(result.records_written, 2);
2351        let stats = result.dlq.unwrap();
2352        assert_eq!(stats.records_dlq, 1);
2353        {
2354            let dlq_records = dlq.0.lock().unwrap();
2355            assert_eq!(dlq_records.len(), 1);
2356        }
2357    }
2358
2359    // ── Quality routing tests ──────────────────────────────────────────────
2360
2361    #[cfg(feature = "quality")]
2362    #[tokio::test]
2363    async fn quality_quarantines_to_dlq_and_writes_survivors() {
2364        use crate::dlq::DlqConfig;
2365        use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2366
2367        let main = Arc::new(MockSink::new());
2368        let dlq_sink = Arc::new(MockSink::new());
2369        let spec = QualitySpec {
2370            record: vec![RecordCheck::NotNull {
2371                field: "id".into(),
2372                treat_missing_as_null: true,
2373                on_failure: OnFailure::Quarantine,
2374            }],
2375            batch: vec![],
2376        };
2377        let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2378        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2379            records: vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})],
2380            bookmark: None,
2381        })];
2382        let opts = RunStreamOptions::new()
2383            .with_dlq(DlqConfig::new(dlq_sink.clone()))
2384            .with_quality(quality);
2385        let result = run_stream(futures::stream::iter(pages), main.as_ref(), opts)
2386            .await
2387            .unwrap();
2388
2389        assert_eq!(result.records_written, 2); // survivors
2390        assert_eq!(main.written(), vec![json!({"id": 1}), json!({"id": 3})]);
2391        // one quarantined record reached the DLQ with a QualityFailure envelope
2392        let dlq = dlq_sink.written();
2393        assert_eq!(dlq.len(), 1);
2394        assert_eq!(dlq[0]["error"]["kind"], "QualityFailure");
2395        assert_eq!(result.dlq.unwrap().records_dlq, 1);
2396    }
2397
2398    #[cfg(feature = "quality")]
2399    #[tokio::test]
2400    #[allow(clippy::await_holding_lock)]
2401    async fn quality_only_page_emits_quality_reason() {
2402        // A page whose DLQ traffic is entirely quality-sourced (the main sink
2403        // accepts every survivor) must label `faucet_sink_dlq_pages_total`
2404        // with `reason="quality"`, not `partial`.
2405        use crate::dlq::DlqConfig;
2406        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2407        use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2408        use metrics_util::debugging::DebugValue;
2409
2410        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2411        let snap = snapshotter();
2412
2413        // MockSink accepts everything → no sink-side failures, so the only DLQ
2414        // traffic comes from the quality quarantine.
2415        let main = Arc::new(MockSink::new());
2416        let dlq_sink = Arc::new(MockSink::new());
2417        let spec = QualitySpec {
2418            record: vec![RecordCheck::NotNull {
2419                field: "id".into(),
2420                treat_missing_as_null: true,
2421                on_failure: OnFailure::Quarantine,
2422            }],
2423            batch: vec![],
2424        };
2425        let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2426        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2427            records: vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})],
2428            bookmark: None,
2429        })];
2430        let opts = RunStreamOptions::new()
2431            .with_name("p_quality_reason")
2432            .with_dlq(DlqConfig::new(dlq_sink.clone()))
2433            .with_quality(quality);
2434        let _ = run_stream(futures::stream::iter(pages), main.as_ref(), opts)
2435            .await
2436            .unwrap();
2437
2438        let snapshot = snap.snapshot();
2439        let saw_quality_reason = snapshot.into_vec().into_iter().any(|(k, _, _, v)| {
2440            k.key().name() == "faucet_sink_dlq_pages_total"
2441                && k.key()
2442                    .labels()
2443                    .any(|l| l.key() == "pipeline" && l.value() == "p_quality_reason")
2444                && k.key()
2445                    .labels()
2446                    .any(|l| l.key() == "reason" && l.value() == "quality")
2447                && matches!(v, DebugValue::Counter(c) if c >= 1)
2448        });
2449        assert!(
2450            saw_quality_reason,
2451            "expected faucet_sink_dlq_pages_total with reason=\"quality\""
2452        );
2453    }
2454
2455    #[cfg(feature = "quality")]
2456    #[tokio::test]
2457    async fn quality_abort_fails_run() {
2458        use crate::quality::{BatchCheck, CompiledQuality, OnFailure, QualitySpec};
2459        let main = MockSink::new();
2460        let spec = QualitySpec {
2461            record: vec![],
2462            batch: vec![BatchCheck::RowCount {
2463                min: Some(5),
2464                max: None,
2465                on_failure: OnFailure::Abort,
2466            }],
2467        };
2468        let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2469        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2470            records: vec![json!({"id": 1})],
2471            bookmark: None,
2472        })];
2473        let opts = RunStreamOptions::new().with_quality(quality);
2474        let result = run_stream(futures::stream::iter(pages), &main, opts).await;
2475        assert!(matches!(result, Err(FaucetError::QualityFailure { .. })));
2476    }
2477
2478    #[cfg(feature = "quality")]
2479    #[tokio::test]
2480    async fn quality_quarantine_without_dlq_is_rejected() {
2481        use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2482        let main = MockSink::new();
2483        let spec = QualitySpec {
2484            record: vec![RecordCheck::NotNull {
2485                field: "id".into(),
2486                treat_missing_as_null: true,
2487                on_failure: OnFailure::Quarantine,
2488            }],
2489            batch: vec![],
2490        };
2491        let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2492        let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2493            records: vec![json!({"id": null})],
2494            bookmark: None,
2495        })];
2496        // No .with_dlq(...) — must be rejected up front.
2497        let opts = RunStreamOptions::new().with_quality(quality);
2498        let result = run_stream(futures::stream::iter(pages), &main, opts).await;
2499        assert!(matches!(result, Err(FaucetError::Config(_))));
2500    }
2501
2502    /// Sink whose write_batch_partial fails every Nth record; drives the
2503    /// error-rate signal. Requires a DLQ in run_stream.
2504    struct FlakySink {
2505        every: usize,
2506        calls: std::sync::Mutex<Vec<usize>>,
2507    }
2508    impl FlakySink {
2509        fn new(every: usize) -> Self {
2510            Self {
2511                every,
2512                calls: std::sync::Mutex::new(Vec::new()),
2513            }
2514        }
2515        fn call_sizes(&self) -> Vec<usize> {
2516            self.calls.lock().unwrap().clone()
2517        }
2518    }
2519    #[async_trait]
2520    impl Sink for FlakySink {
2521        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2522            Ok(records.len())
2523        }
2524        async fn write_batch_partial(
2525            &self,
2526            records: &[Value],
2527        ) -> Result<Vec<crate::RowOutcome>, FaucetError> {
2528            self.calls.lock().unwrap().push(records.len());
2529            Ok(records
2530                .iter()
2531                .enumerate()
2532                .map(|(i, _)| {
2533                    if (i + 1) % self.every == 0 {
2534                        Err(FaucetError::Sink("synthetic".into()))
2535                    } else {
2536                        Ok(())
2537                    }
2538                })
2539                .collect())
2540        }
2541    }
2542
2543    #[tokio::test]
2544    async fn adaptive_shrinks_under_errors_on_dlq_path() {
2545        use crate::adaptive::AdaptiveBatchConfig;
2546        use crate::dlq::{DlqConfig, OnBatchError};
2547        // Three pages of 400 records each, matching the pattern used by
2548        // adaptive_shrinks_under_latency_target_then_smaller_chunks. After
2549        // page 1 (single 400-record chunk, 25% error rate > threshold 0.1),
2550        // the controller shrinks and subsequent pages get smaller sub-batches.
2551        let mk = || StreamPage {
2552            records: (0..400).map(|i| json!({"i": i})).collect(),
2553            bookmark: None,
2554        };
2555        let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2556        let sink = FlakySink::new(4); // 25% error rate > threshold 0.1
2557        let dlq_sink: Arc<dyn Sink> = Arc::new(MockSink::new());
2558        let dlq = DlqConfig {
2559            sink: dlq_sink,
2560            on_batch_error: OnBatchError::Propagate,
2561            max_failures_per_page: None,
2562            max_failures_total: None,
2563            include_original_payload: true,
2564        };
2565        let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2566            "enabled": true, "min": 50, "max": 400,
2567            "decrease_factor": 0.5, "cooldown_batches": 0, "error_threshold": 0.1
2568        }))
2569        .unwrap();
2570        let opts = RunStreamOptions::new().with_dlq(dlq).with_adaptive(cfg);
2571        let result = run_stream(stream, &sink, opts).await.unwrap();
2572        // 3 × 400 = 1200 records total; FlakySink(4) fails every 4th record
2573        // per-chunk (floor(n/4)), so exact counts depend on chunk sizes due to
2574        // integer arithmetic. With the controller shrinking under 25% error
2575        // rate: page 1 = one 400-record chunk (300 written, 100 DLQ); pages
2576        // 2–3 = smaller sub-batches; overall >≈75% of 1200 commit and ~25% go
2577        // to the DLQ.
2578        assert!(
2579            result.records_written >= 900,
2580            "expected ≥900 written, got {}",
2581            result.records_written
2582        );
2583        let sizes = sink.call_sizes();
2584        assert_eq!(sizes[0], 400, "first chunk is the full page");
2585        assert!(
2586            sizes.last().unwrap() < &400,
2587            "controller should shrink under errors: {sizes:?}"
2588        );
2589        assert!(
2590            result.dlq.unwrap().records_dlq >= 250,
2591            "expected ≥250 DLQ records"
2592        );
2593    }
2594
2595    // ── Adaptive batch-size tests ──────────────────────────────────────────
2596
2597    /// A sink that records each write_batch call's size and reports a fixed
2598    /// per-call latency, so we can assert the adaptive controller resliced.
2599    struct RecordingSink {
2600        calls: std::sync::Mutex<Vec<usize>>,
2601        latency: std::time::Duration,
2602    }
2603    impl RecordingSink {
2604        fn new(latency_ms: u64) -> Self {
2605            Self {
2606                calls: std::sync::Mutex::new(Vec::new()),
2607                latency: std::time::Duration::from_millis(latency_ms),
2608            }
2609        }
2610        fn call_sizes(&self) -> Vec<usize> {
2611            self.calls.lock().unwrap().clone()
2612        }
2613    }
2614    #[async_trait]
2615    impl Sink for RecordingSink {
2616        async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2617            tokio::time::sleep(self.latency).await;
2618            self.calls.lock().unwrap().push(records.len());
2619            Ok(records.len())
2620        }
2621    }
2622
2623    #[tokio::test]
2624    async fn adaptive_reslices_non_dlq_page_into_subbatches() {
2625        use crate::adaptive::AdaptiveBatchConfig;
2626        let page = StreamPage {
2627            records: (0..1000).map(|i| json!({ "i": i })).collect(),
2628            bookmark: None,
2629        };
2630        let stream = futures::stream::iter(vec![Ok(page)]);
2631        let sink = RecordingSink::new(0);
2632        let cfg: AdaptiveBatchConfig =
2633            serde_json::from_value(json!({"enabled": true, "min": 100, "max": 1000})).unwrap();
2634        let result = run_stream(stream, &sink, RunStreamOptions::new().with_adaptive(cfg))
2635            .await
2636            .unwrap();
2637        assert_eq!(result.records_written, 1000);
2638        // current starts at min(max, page_len)=1000 → one chunk (no regression).
2639        assert_eq!(sink.call_sizes(), vec![1000]);
2640    }
2641
2642    #[tokio::test]
2643    async fn adaptive_shrinks_under_latency_target_then_smaller_chunks() {
2644        use crate::adaptive::AdaptiveBatchConfig;
2645        let mk = || StreamPage {
2646            records: (0..400).map(|i| json!({"i": i})).collect(),
2647            bookmark: None,
2648        };
2649        let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2650        let sink = RecordingSink::new(50);
2651        let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2652            "enabled": true, "min": 50, "max": 400,
2653            "decrease_factor": 0.5, "cooldown_batches": 0,
2654            "target_latency_ms": 10, "latency_window": 1
2655        }))
2656        .unwrap();
2657        let result = run_stream(stream, &sink, RunStreamOptions::new().with_adaptive(cfg))
2658            .await
2659            .unwrap();
2660        assert_eq!(result.records_written, 1200);
2661        let sizes = sink.call_sizes();
2662        assert_eq!(sizes[0], 400);
2663        assert!(
2664            sizes.last().unwrap() < &400,
2665            "controller should have shrunk: {sizes:?}"
2666        );
2667    }
2668
2669    #[tokio::test]
2670    #[allow(clippy::await_holding_lock)]
2671    async fn adaptive_emits_batch_size_and_adjustments_metrics() {
2672        // Mirror the same LOCK+snapshotter pattern used by
2673        // `pipeline_run_increments_runs_total` and `dlq_emits_records_total_and_pages_total`.
2674        use crate::adaptive::AdaptiveBatchConfig;
2675        use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2676        use metrics_util::debugging::DebugValue;
2677
2678        let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2679        let snap = snapshotter();
2680
2681        // Three pages of 400 records. RecordingSink(50) reports 50ms latency.
2682        // Config: target_latency_ms=10, latency_window=1, cooldown_batches=0 so
2683        // p50 (50ms) > 10*1.2=12ms on every batch → controller shrinks each time,
2684        // guaranteeing at least one `faucet_pipeline_adaptive_batch_adjustments_total`
2685        // is emitted.
2686        let mk = || StreamPage {
2687            records: (0..400).map(|i| json!({"i": i})).collect(),
2688            bookmark: None,
2689        };
2690        let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2691        let sink = RecordingSink::new(50);
2692        let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2693            "enabled": true, "min": 50, "max": 400,
2694            "decrease_factor": 0.5, "cooldown_batches": 0,
2695            "target_latency_ms": 10, "latency_window": 1
2696        }))
2697        .unwrap();
2698
2699        let _ = run_stream(
2700            stream,
2701            &sink,
2702            RunStreamOptions::new()
2703                .with_adaptive(cfg)
2704                .with_name("p")
2705                .with_row("r"),
2706        )
2707        .await
2708        .unwrap();
2709
2710        let snapshot = snap.snapshot();
2711        let mut saw_batch_size = false;
2712        let mut saw_adjustments = false;
2713        for (k, _u, _d, v) in snapshot.into_vec() {
2714            let key = k.key();
2715            let labels = key.labels().collect::<Vec<_>>();
2716            let has = |k: &str, val: &str| labels.iter().any(|l| l.key() == k && l.value() == val);
2717
2718            if key.name() == "faucet_pipeline_adaptive_batch_size"
2719                && has("pipeline", "p")
2720                && has("row", "r")
2721                && matches!(v, DebugValue::Gauge(_))
2722            {
2723                saw_batch_size = true;
2724            }
2725            if key.name() == "faucet_pipeline_adaptive_batch_adjustments_total"
2726                && has("pipeline", "p")
2727                && has("row", "r")
2728                && matches!(v, DebugValue::Counter(c) if c >= 1)
2729            {
2730                saw_adjustments = true;
2731            }
2732        }
2733        assert!(
2734            saw_batch_size,
2735            "expected faucet_pipeline_adaptive_batch_size gauge with pipeline=p, row=r"
2736        );
2737        assert!(
2738            saw_adjustments,
2739            "expected faucet_pipeline_adaptive_batch_adjustments_total counter with pipeline=p, row=r"
2740        );
2741    }
2742}