1use crate::dlq::{DlqConfig, DlqStats};
43use crate::error::FaucetError;
44use crate::observability::RunStreamOptions;
45use crate::state::{StateStore, validate_state_key};
46use crate::traits::{Sink, Source};
47use futures_core::Stream;
48use serde_json::Value;
49use std::pin::Pin;
50use std::sync::Arc;
51
52pub const DEFAULT_BATCH_SIZE: usize = 1000;
58
59pub const MAX_BATCH_SIZE: usize = 1_000_000;
64
65pub fn validate_batch_size(batch_size: usize) -> Result<usize, FaucetError> {
77 if batch_size > MAX_BATCH_SIZE {
78 return Err(FaucetError::Config(format!(
79 "batch_size {batch_size} exceeds maximum {MAX_BATCH_SIZE} \
80 (use 0 to opt out of batching entirely)"
81 )));
82 }
83 Ok(batch_size)
84}
85
86#[derive(Debug, Clone, Default)]
96pub struct StreamPage {
97 pub records: Vec<Value>,
99 pub bookmark: Option<Value>,
101}
102
103#[derive(Debug, Clone)]
105pub struct PipelineResult {
106 pub records_written: usize,
108 pub bookmark: Option<Value>,
116 pub dlq: Option<DlqStats>,
118}
119
120pub struct Pipeline<'a, So: Source + ?Sized, Si: Sink + ?Sized> {
125 source: &'a So,
126 sink: &'a Si,
127 state_store: Option<Arc<dyn StateStore>>,
128 name: Option<String>,
129 row: Option<String>,
130 run_id: Option<String>,
131 dlq: Option<DlqConfig>,
132 #[cfg(feature = "quality")]
133 quality: Option<Arc<crate::quality::CompiledQuality>>,
134 adaptive: Option<crate::adaptive::AdaptiveBatchConfig>,
135 cancel: Option<tokio_util::sync::CancellationToken>,
136}
137
138impl<'a, So: Source + ?Sized, Si: Sink + ?Sized> Pipeline<'a, So, Si> {
139 pub fn new(source: &'a So, sink: &'a Si) -> Self {
141 Self {
142 source,
143 sink,
144 state_store: None,
145 name: None,
146 row: None,
147 run_id: None,
148 dlq: None,
149 #[cfg(feature = "quality")]
150 quality: None,
151 adaptive: None,
152 cancel: None,
153 }
154 }
155
156 pub fn with_state_store(mut self, store: Arc<dyn StateStore>) -> Self {
170 self.state_store = Some(store);
171 self
172 }
173
174 pub fn with_name(mut self, name: impl Into<String>) -> Self {
177 self.name = Some(name.into());
178 self
179 }
180
181 pub fn with_row(mut self, row: impl Into<String>) -> Self {
184 self.row = Some(row.into());
185 self
186 }
187
188 pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
192 self.run_id = Some(run_id.into());
193 self
194 }
195
196 pub fn with_dlq(mut self, dlq: DlqConfig) -> Self {
198 self.dlq = Some(dlq);
199 self
200 }
201
202 #[cfg(feature = "quality")]
205 pub fn with_quality(mut self, quality: Arc<crate::quality::CompiledQuality>) -> Self {
206 self.quality = Some(quality);
207 self
208 }
209
210 pub fn with_adaptive(mut self, cfg: crate::adaptive::AdaptiveBatchConfig) -> Self {
214 self.adaptive = Some(cfg);
215 self
216 }
217
218 pub fn with_cancel(mut self, cancel: tokio_util::sync::CancellationToken) -> Self {
223 self.cancel = Some(cancel);
224 self
225 }
226
227 pub async fn run(&self) -> Result<PipelineResult, FaucetError> {
241 use crate::observability::{
242 DurationGuard, InstrumentedSink, InstrumentedSource, InstrumentedStateStore, Labels,
243 };
244 use metrics::{Label, SharedString, counter, gauge};
245 use tracing::Instrument;
246
247 let name = self.name.clone().unwrap_or_else(|| "unnamed".to_string());
249 let row = self.row.clone().unwrap_or_default();
250 let run_id = self
251 .run_id
252 .clone()
253 .unwrap_or_else(|| uuid::Uuid::now_v7().to_string());
254 let obs_labels = Labels::new(name.clone(), row.clone(), run_id.clone());
255
256 let wrapped_source = InstrumentedSource::new(self.source, obs_labels.clone());
258 let wrapped_sink = InstrumentedSink::new(self.sink, obs_labels.clone());
259 let wrapped_state_store: Option<Arc<dyn StateStore>> = self.state_store.as_ref().map(|s| {
260 Arc::new(InstrumentedStateStore::new(
261 Arc::clone(s),
262 obs_labels.clone(),
263 )) as Arc<dyn StateStore>
264 });
265
266 let span = tracing::info_span!(
269 "faucet.pipeline.run",
270 pipeline = %name,
271 row = %row,
272 run_id = %run_id,
273 source = %wrapped_source.connector_name(),
274 sink = %wrapped_sink.connector_name(),
275 );
276
277 let base_labels: Vec<Label> = vec![
279 Label::new("pipeline", SharedString::from(name.clone())),
280 Label::new("row", SharedString::from(row.clone())),
281 ];
282 let run_labels: Vec<Label> = {
283 let mut v = base_labels.clone();
284 v.push(Label::new(
285 "source",
286 SharedString::from(wrapped_source.connector_name().to_string()),
287 ));
288 v.push(Label::new(
289 "sink",
290 SharedString::from(wrapped_sink.connector_name().to_string()),
291 ));
292 v
293 };
294
295 struct InFlightGuard(Vec<Label>);
297 impl Drop for InFlightGuard {
298 fn drop(&mut self) {
299 gauge!("faucet_pipeline_in_flight", self.0.clone()).decrement(1.0);
300 }
301 }
302 gauge!("faucet_pipeline_in_flight", base_labels.clone()).increment(1.0);
303 let _in_flight = InFlightGuard(base_labels.clone());
304
305 let start_unix = std::time::SystemTime::now()
308 .duration_since(std::time::UNIX_EPOCH)
309 .map(|d| d.as_secs_f64())
310 .unwrap_or(0.0);
311 gauge!(
312 "faucet_pipeline_start_time_unix_seconds",
313 base_labels.clone()
314 )
315 .set(start_unix);
316
317 let _run_timer =
319 DurationGuard::new("faucet_pipeline_run_duration_seconds", run_labels.clone());
320
321 let result = async {
323 let state_key = self.source.state_key();
326 if let (Some(store), Some(key)) = (wrapped_state_store.as_ref(), state_key.as_ref()) {
327 validate_state_key(key)?;
328 if let Some(prior) = store.get(key).await? {
329 wrapped_source.apply_start_bookmark(prior).await?;
330 }
331 }
332
333 let ctx = std::collections::HashMap::new();
334 let pages = wrapped_source.stream_pages(&ctx, DEFAULT_BATCH_SIZE);
335
336 let mut opts = RunStreamOptions::new()
337 .with_name(name.clone())
338 .with_row(row.clone())
339 .with_run_id(run_id.clone());
340 if let (Some(store), Some(key)) = (wrapped_state_store.clone(), state_key) {
341 opts = opts.with_state(store, key);
342 }
343 if let Some(dlq) = self.dlq.clone() {
344 opts = opts.with_dlq(dlq);
345 }
346 #[cfg(feature = "quality")]
347 if let Some(q) = self.quality.clone() {
348 opts = opts.with_quality(q);
349 }
350 if let Some(ad) = self.adaptive.clone() {
351 opts = opts.with_adaptive(ad);
352 }
353 if let Some(cancel) = self.cancel.clone() {
354 opts = opts.with_cancel(cancel);
355 }
356
357 run_stream(pages, &wrapped_sink, opts).await
358 }
359 .instrument(span)
360 .await;
361
362 let status = if result.is_ok() { "ok" } else { "err" };
366 let mut final_labels = run_labels;
367 final_labels.push(Label::new("status", SharedString::const_str(status)));
368 if let Err(ref e) = result {
369 final_labels.push(Label::new(
370 "kind",
371 SharedString::const_str(crate::observability::decorator::error_kind(e)),
372 ));
373 }
374 counter!("faucet_pipeline_runs_total", final_labels).increment(1);
375
376 result
377 }
378}
379
380pub async fn run_stream<S, Si>(
398 mut pages: S,
399 sink: &Si,
400 options: RunStreamOptions,
401) -> Result<PipelineResult, FaucetError>
402where
403 S: Stream<Item = Result<StreamPage, FaucetError>> + Unpin,
404 Si: Sink + ?Sized,
405{
406 use crate::dlq::{DlqStats, OnBatchError, build_envelope};
407
408 let state_store = options.state_store.clone();
409 let state_key = options.state_key.clone();
410 let pipeline_name = options.pipeline_name.unwrap_or_else(|| "unnamed".into());
411 let row = options.row.unwrap_or_default();
412 let run_id = options.run_id.unwrap_or_default();
413 let dlq = options.dlq.clone();
414 let cancel = options.cancel.clone();
415
416 #[cfg(feature = "quality")]
417 let quality = options.quality.clone();
418
419 #[cfg(feature = "quality")]
421 if let Some(q) = quality.as_ref()
422 && q.requires_dlq()
423 && dlq.is_none()
424 {
425 return Err(FaucetError::Config(
426 "quality: on_failure 'quarantine'/'quarantine_batch' requires a DLQ sink".into(),
427 ));
428 }
429
430 if let Some(key) = state_key.as_ref() {
431 validate_state_key(key)?;
432 }
433
434 let mut records_written = 0usize;
435 let mut last_bookmark: Option<Value> = None;
436 let mut dlq_stats = DlqStats::default();
437
438 let adaptive_cfg = options.adaptive.clone().filter(|c| c.enabled);
439 if let Some(cfg) = adaptive_cfg.as_ref() {
443 cfg.validate()?;
444 }
445 let mut controller: Option<crate::adaptive::AimdController> = None;
446 let mut warned_noop_sink = false;
447
448 let sink_name = sink.connector_name();
449 let dlq_sink_name = dlq.as_ref().map(|d| d.sink.connector_name()).unwrap_or("");
450
451 let mut cancelled = false;
462 let loop_result: Result<(), FaucetError> = async {
463 loop {
464 let page = match &cancel {
468 Some(token) => tokio::select! {
469 biased;
470 _ = token.cancelled() => {
471 cancelled = true;
472 break;
473 }
474 p = std::future::poll_fn(|cx| Pin::new(&mut pages).poll_next(cx)) => p,
475 },
476 None => std::future::poll_fn(|cx| Pin::new(&mut pages).poll_next(cx)).await,
477 };
478 match page {
479 Some(Ok(page)) => {
480 if page.records.is_empty() && page.bookmark.is_none() {
481 continue;
482 }
483
484 #[cfg(feature = "quality")]
486 let (records, quality_envelopes): (Vec<Value>, Vec<Value>) =
487 if let Some(q) = quality.as_ref() {
488 let labels =
489 crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
490 let outcome = crate::observability::instrumented_apply_quality(
491 page.records,
492 q,
493 &labels,
494 )?;
495 let envelopes: Vec<Value> = outcome
496 .quarantined
497 .iter()
498 .map(|qr| {
499 let err = FaucetError::QualityFailure {
500 check: qr.check.to_string(),
501 message: qr.message.clone(),
502 };
503 build_envelope(
507 &qr.record,
508 &err,
509 sink_name,
510 &pipeline_name,
511 &row,
512 qr.page_index,
513 )
514 })
515 .collect();
516 (outcome.survivors, envelopes)
517 } else {
518 (page.records, Vec::new())
519 };
520 #[cfg(not(feature = "quality"))]
521 let (records, quality_envelopes): (Vec<Value>, Vec<Value>) =
522 (page.records, Vec::new());
523
524 let page = StreamPage {
525 records,
526 bookmark: page.bookmark,
527 };
528
529 if let Some(ref dlq_cfg) = dlq {
530 use crate::dlq::DlqReason;
532 use metrics::{Label, SharedString, counter};
533 let metric_labels: Vec<Label> = vec![
534 Label::new("pipeline", SharedString::from(pipeline_name.clone())),
535 Label::new("row", SharedString::from(row.clone())),
536 Label::new("connector", SharedString::from(sink_name.to_string())),
537 Label::new(
538 "dlq_connector",
539 SharedString::from(dlq_sink_name.to_string()),
540 ),
541 ];
542 let span = tracing::info_span!(
543 "faucet.dlq.route",
544 pipeline = %pipeline_name,
545 row = %row,
546 run_id = %run_id,
547 connector = %sink_name,
548 dlq_connector = %dlq_sink_name,
549 );
550 let _enter = span.enter();
551
552 let mut envelopes: Vec<Value> = Vec::new();
556 let mut page_success = 0usize;
557 let mut outer_err_recovered = false;
558 let mut had_per_row_sink_failure = false;
563 let records_len = page.records.len();
564 let mut offset = 0usize;
565 while offset < records_len {
566 let size = match adaptive_cfg.as_ref() {
567 Some(cfg) => {
568 let ctrl = controller.get_or_insert_with(|| {
569 crate::adaptive::AimdController::new(cfg, records_len)
570 });
571 ctrl.current().max(1).min(records_len - offset)
572 }
573 None => records_len - offset, };
575 if adaptive_cfg.is_some() {
576 maybe_warn_noop_sink(sink_name, &mut warned_noop_sink);
577 }
578 let chunk = &page.records[offset..offset + size];
579 let t0 = std::time::Instant::now();
580 let chunk_outcomes_result = sink.write_batch_partial(chunk).await;
581 let latency = t0.elapsed();
582 let (chunk_outcomes, chunk_synthesized): (
591 Vec<crate::RowOutcome>,
592 bool,
593 ) = match chunk_outcomes_result {
594 Ok(o) => (o, false),
595 Err(e) => match dlq_cfg.on_batch_error {
596 OnBatchError::Propagate => return Err(e),
597 OnBatchError::DlqAll => {
598 outer_err_recovered = true;
599 let msg = e.to_string();
600 let synth = (0..chunk.len())
601 .map(|_| Err(FaucetError::Sink(msg.clone())))
602 .collect();
603 (synth, true)
604 }
605 },
606 };
607 let mut chunk_errors = 0usize;
608 for (j, outcome) in chunk_outcomes.iter().enumerate() {
609 match outcome {
610 Ok(()) => page_success += 1,
611 Err(err) => {
612 chunk_errors += 1;
613 if !chunk_synthesized {
614 had_per_row_sink_failure = true;
615 }
616 envelopes.push(build_envelope(
617 &chunk[j],
618 err,
619 sink_name,
620 &pipeline_name,
621 &row,
622 offset + j,
623 ));
624 }
625 }
626 }
627 if let Some(ctrl) = controller.as_mut() {
628 let adj = ctrl.observe(crate::adaptive::Observation {
629 batch_len: chunk.len(),
630 errors: chunk_errors,
631 latency,
632 });
633 emit_adaptive_metrics(ctrl, adj, &pipeline_name, &row);
634 }
635 offset += size;
636 }
637 #[cfg(feature = "quality")]
642 let quality_count = quality_envelopes.len();
643 #[cfg(not(feature = "quality"))]
644 let quality_count = 0usize;
645 envelopes.splice(0..0, quality_envelopes);
646 let page_failures = envelopes.len();
647
648 let mut budget_error: Option<FaucetError> = None;
661 if let Some(limit) = dlq_cfg.max_failures_per_page
662 && page_failures > limit
663 {
664 let mut lbl = metric_labels.clone();
665 lbl.retain(|l| l.key() != "dlq_connector");
666 lbl.push(Label::new("scope", SharedString::const_str("per_page")));
667 counter!("faucet_sink_dlq_budget_exceeded_total", lbl).increment(1);
668 budget_error = Some(FaucetError::Sink(format!(
669 "DLQ per-page budget exceeded: {page_failures} > {limit}"
670 )));
671 }
672 let new_total = dlq_stats.records_dlq + page_failures;
673 if budget_error.is_none()
674 && let Some(limit) = dlq_cfg.max_failures_total
675 && new_total > limit
676 {
677 let mut lbl = metric_labels.clone();
678 lbl.retain(|l| l.key() != "dlq_connector");
679 lbl.push(Label::new("scope", SharedString::const_str("total")));
680 counter!("faucet_sink_dlq_budget_exceeded_total", lbl).increment(1);
681 budget_error = Some(FaucetError::Sink(format!(
682 "DLQ total budget exceeded: {new_total} > {limit}"
683 )));
684 }
685
686 if !envelopes.is_empty() {
688 let _dlq_write_timer = crate::observability::DurationGuard::new(
689 "faucet_sink_dlq_write_duration_seconds",
690 metric_labels.clone(),
691 );
692 dlq_cfg.sink.write_batch(&envelopes).await.map_err(|e| {
693 let mut lbl = metric_labels.clone();
694 lbl.push(Label::new(
695 "kind",
696 SharedString::const_str(
697 crate::observability::decorator::error_kind(&e),
698 ),
699 ));
700 counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
701 FaucetError::Sink(format!("DLQ sink write failed: {e}"))
702 })?;
703 dlq_stats.records_dlq += page_failures;
704 dlq_stats.pages_with_failures += 1;
705
706 let reason_label = if had_per_row_sink_failure {
725 DlqReason::Partial.as_str()
726 } else if outer_err_recovered {
727 DlqReason::DlqAll.as_str()
728 } else if page_failures > quality_count {
729 DlqReason::Partial.as_str()
730 } else {
731 DlqReason::Quality.as_str()
732 };
733 counter!("faucet_sink_dlq_records_total", metric_labels.clone())
734 .increment(page_failures as u64);
735 let mut page_labels = metric_labels.clone();
736 page_labels
737 .push(Label::new("reason", SharedString::const_str(reason_label)));
738 counter!("faucet_sink_dlq_pages_total", page_labels).increment(1);
739 }
740
741 records_written += page_success;
742
743 if let Some(bookmark) = page.bookmark {
744 sink.flush().await?;
745 let _dlq_flush_timer = crate::observability::DurationGuard::new(
746 "faucet_sink_dlq_flush_duration_seconds",
747 metric_labels.clone(),
748 );
749 dlq_cfg.sink.flush().await.map_err(|e| {
750 let mut lbl = metric_labels.clone();
751 lbl.push(Label::new(
752 "kind",
753 SharedString::const_str(
754 crate::observability::decorator::error_kind(&e),
755 ),
756 ));
757 counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
758 FaucetError::Sink(format!("DLQ sink flush failed: {e}"))
759 })?;
760 let bm_labels =
761 crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
762 crate::observability::update_bookmark_lag(&bookmark, &bm_labels);
763 if let (Some(store), Some(key)) =
764 (state_store.as_ref(), state_key.as_ref())
765 {
766 store.put(key, &bookmark).await?;
767 }
768 last_bookmark = Some(bookmark);
769 }
770
771 if let Some(e) = budget_error {
778 return Err(e);
779 }
780 } else {
781 debug_assert!(
783 quality_envelopes.is_empty(),
784 "quality quarantine without DLQ should have been rejected at run start"
785 );
786 if !page.records.is_empty() {
787 if let Some(cfg) = adaptive_cfg.as_ref() {
788 let ctrl = controller.get_or_insert_with(|| {
789 crate::adaptive::AimdController::new(cfg, page.records.len())
790 });
791 maybe_warn_noop_sink(sink_name, &mut warned_noop_sink);
792 let mut offset = 0;
793 while offset < page.records.len() {
794 let size =
795 ctrl.current().max(1).min(page.records.len() - offset);
796 let chunk = &page.records[offset..offset + size];
797 let t0 = std::time::Instant::now();
798 let n = sink.write_batch(chunk).await?;
799 let latency = t0.elapsed();
800 records_written += n;
801 offset += size;
802 let adj = ctrl.observe(crate::adaptive::Observation {
803 batch_len: chunk.len(),
804 errors: 0,
805 latency,
806 });
807 emit_adaptive_metrics(ctrl, adj, &pipeline_name, &row);
808 }
809 } else {
810 records_written += sink.write_batch(&page.records).await?;
811 }
812 }
813 if let Some(bookmark) = page.bookmark {
814 sink.flush().await?;
815 let bm_labels =
816 crate::observability::Labels::new(&*pipeline_name, &*row, &*run_id);
817 crate::observability::update_bookmark_lag(&bookmark, &bm_labels);
818 if let (Some(store), Some(key)) =
819 (state_store.as_ref(), state_key.as_ref())
820 {
821 store.put(key, &bookmark).await?;
822 }
823 last_bookmark = Some(bookmark);
824 }
825 }
826 }
827 Some(Err(e)) => return Err(e),
828 None => break,
829 }
830 }
831 Ok(())
832 }
833 .await;
834
835 if let Err(e) = loop_result {
842 if let Some(ref dlq_cfg) = dlq
843 && let Err(flush_err) = dlq_cfg.sink.flush().await
844 {
845 tracing::warn!(
846 error = %flush_err,
847 "DLQ sink flush failed during error unwind; original error preserved"
848 );
849 }
850 if let Err(flush_err) = sink.flush().await {
851 tracing::warn!(
852 error = %flush_err,
853 "sink flush failed during error unwind; original error preserved"
854 );
855 }
856 return Err(e);
857 }
858
859 if let Some(ref dlq_cfg) = dlq {
865 let final_metric_labels: Vec<metrics::Label> = vec![
866 metrics::Label::new(
867 "pipeline",
868 metrics::SharedString::from(pipeline_name.clone()),
869 ),
870 metrics::Label::new("row", metrics::SharedString::from(row.clone())),
871 metrics::Label::new(
872 "connector",
873 metrics::SharedString::from(sink_name.to_string()),
874 ),
875 metrics::Label::new(
876 "dlq_connector",
877 metrics::SharedString::from(dlq_sink_name.to_string()),
878 ),
879 ];
880 let _final_dlq_flush_timer = crate::observability::DurationGuard::new(
881 "faucet_sink_dlq_flush_duration_seconds",
882 final_metric_labels.clone(),
883 );
884 dlq_cfg.sink.flush().await.map_err(|e| {
885 let mut lbl = final_metric_labels.clone();
886 lbl.push(metrics::Label::new(
887 "kind",
888 metrics::SharedString::const_str(crate::observability::decorator::error_kind(&e)),
889 ));
890 metrics::counter!("faucet_sink_dlq_errors_total", lbl).increment(1);
891 FaucetError::Sink(format!("DLQ sink flush failed: {e}"))
892 })?;
893 }
894 sink.flush().await?;
895
896 if cancelled {
897 tracing::info!(
898 records_written,
899 "pipeline run cancelled cooperatively; sink flushed (partial output is durable)"
900 );
901 }
902
903 tracing::info!(
904 records_written,
905 cancelled,
906 has_bookmark = last_bookmark.is_some(),
907 persisted = state_store.is_some() && state_key.is_some() && last_bookmark.is_some(),
908 dlq_records = dlq_stats.records_dlq,
909 "pipeline streaming run complete"
910 );
911
912 Ok(PipelineResult {
913 records_written,
914 bookmark: last_bookmark,
915 dlq: dlq.is_some().then_some(dlq_stats),
916 })
917}
918
919fn emit_adaptive_metrics(
922 ctrl: &crate::adaptive::AimdController,
923 adj: Option<crate::adaptive::Adjustment>,
924 pipeline: &str,
925 row: &str,
926) {
927 use metrics::{Label, SharedString, counter, gauge};
928 let base = vec![
929 Label::new("pipeline", SharedString::from(pipeline.to_string())),
930 Label::new("row", SharedString::from(row.to_string())),
931 ];
932 gauge!("faucet_pipeline_adaptive_batch_size", base.clone()).set(ctrl.current() as f64);
933 gauge!(
934 "faucet_pipeline_adaptive_batch_cooldown_active",
935 base.clone()
936 )
937 .set(if ctrl.cooldown_active() { 1.0 } else { 0.0 });
938 if let Some(p50) = ctrl.p50_latency_ms() {
939 gauge!(
940 "faucet_pipeline_adaptive_batch_p50_latency_ms",
941 base.clone()
942 )
943 .set(p50 as f64);
944 }
945 if let Some(a) = adj {
946 let mut lbl = base;
947 lbl.push(Label::new(
948 "direction",
949 SharedString::const_str(a.direction.as_str()),
950 ));
951 lbl.push(Label::new(
952 "reason",
953 SharedString::const_str(a.reason.as_str()),
954 ));
955 counter!("faucet_pipeline_adaptive_batch_adjustments_total", lbl).increment(1);
956 }
957}
958
959fn maybe_warn_noop_sink(sink_name: &str, warned: &mut bool) {
962 if !*warned && matches!(sink_name, "jsonl" | "csv" | "stdout") {
963 tracing::info!(
964 sink = sink_name,
965 "adaptive batch sizing is a no-op for this per-record sink"
966 );
967 *warned = true;
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use super::*;
974 use async_trait::async_trait;
975 use serde_json::json;
976
977 struct MockSource(Vec<Value>);
980
981 #[async_trait]
982 impl Source for MockSource {
983 async fn fetch_with_context(
984 &self,
985 _context: &std::collections::HashMap<String, Value>,
986 ) -> Result<Vec<Value>, FaucetError> {
987 Ok(self.0.clone())
988 }
989 }
990
991 struct IncrementalSource {
992 records: Vec<Value>,
993 bookmark: Value,
994 }
995
996 #[async_trait]
997 impl Source for IncrementalSource {
998 async fn fetch_with_context(
999 &self,
1000 _context: &std::collections::HashMap<String, Value>,
1001 ) -> Result<Vec<Value>, FaucetError> {
1002 Ok(self.records.clone())
1003 }
1004 async fn fetch_with_context_incremental(
1005 &self,
1006 _context: &std::collections::HashMap<String, Value>,
1007 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
1008 Ok((self.records.clone(), Some(self.bookmark.clone())))
1009 }
1010 }
1011
1012 struct FailingSource;
1013
1014 #[async_trait]
1015 impl Source for FailingSource {
1016 async fn fetch_with_context(
1017 &self,
1018 _context: &std::collections::HashMap<String, Value>,
1019 ) -> Result<Vec<Value>, FaucetError> {
1020 Err(FaucetError::Auth("no credentials".into()))
1021 }
1022 }
1023
1024 struct MockSink(std::sync::Mutex<Vec<Value>>);
1027
1028 impl MockSink {
1029 fn new() -> Self {
1030 Self(std::sync::Mutex::new(Vec::new()))
1031 }
1032 fn written(&self) -> Vec<Value> {
1033 self.0.lock().unwrap().clone()
1034 }
1035 }
1036
1037 #[async_trait]
1038 impl Sink for MockSink {
1039 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1040 self.0.lock().unwrap().extend(records.iter().cloned());
1041 Ok(records.len())
1042 }
1043 }
1044
1045 struct FailingSink;
1046
1047 #[async_trait]
1048 impl Sink for FailingSink {
1049 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
1050 Err(FaucetError::Sink("write failed".into()))
1051 }
1052 }
1053
1054 struct FlushTrackingSink {
1059 written: std::sync::Mutex<Vec<Value>>,
1060 flush_count: std::sync::atomic::AtomicUsize,
1061 }
1062
1063 impl FlushTrackingSink {
1064 fn new() -> Self {
1065 Self {
1066 written: std::sync::Mutex::new(Vec::new()),
1067 flush_count: std::sync::atomic::AtomicUsize::new(0),
1068 }
1069 }
1070 fn written(&self) -> Vec<Value> {
1071 self.written.lock().unwrap().clone()
1072 }
1073 fn flush_count(&self) -> usize {
1074 self.flush_count.load(std::sync::atomic::Ordering::SeqCst)
1075 }
1076 }
1077
1078 #[async_trait]
1079 impl Sink for FlushTrackingSink {
1080 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1081 self.written.lock().unwrap().extend(records.iter().cloned());
1082 Ok(records.len())
1083 }
1084 async fn flush(&self) -> Result<(), FaucetError> {
1085 self.flush_count
1086 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1087 Ok(())
1088 }
1089 }
1090
1091 #[test]
1094 fn stream_page_constructs() {
1095 let page = StreamPage {
1096 records: vec![json!({"id": 1})],
1097 bookmark: Some(json!("2026-05-18")),
1098 };
1099 assert_eq!(page.records.len(), 1);
1100 assert_eq!(page.bookmark, Some(json!("2026-05-18")));
1101 }
1102
1103 #[test]
1104 fn validate_batch_size_accepts_zero_as_no_batching_sentinel() {
1105 assert_eq!(validate_batch_size(0).unwrap(), 0);
1107 }
1108
1109 #[test]
1110 fn validate_batch_size_rejects_too_large() {
1111 let err = validate_batch_size(MAX_BATCH_SIZE + 1).unwrap_err();
1112 assert!(matches!(err, FaucetError::Config(_)));
1113 }
1114
1115 #[test]
1116 fn validate_batch_size_accepts_one() {
1117 assert_eq!(validate_batch_size(1).unwrap(), 1);
1118 }
1119
1120 #[test]
1121 fn validate_batch_size_accepts_max() {
1122 assert_eq!(validate_batch_size(MAX_BATCH_SIZE).unwrap(), MAX_BATCH_SIZE);
1123 }
1124
1125 const _: () = {
1127 assert!(DEFAULT_BATCH_SIZE >= 1);
1128 assert!(DEFAULT_BATCH_SIZE <= MAX_BATCH_SIZE);
1129 };
1130
1131 #[tokio::test]
1134 async fn batch_pipeline_writes_all_records() {
1135 let source = MockSource(vec![json!({"id": 1}), json!({"id": 2}), json!({"id": 3})]);
1136 let sink = MockSink::new();
1137
1138 let result = Pipeline::new(&source, &sink).run().await.unwrap();
1139
1140 assert_eq!(result.records_written, 3);
1141 assert!(result.bookmark.is_none());
1142 assert_eq!(sink.written().len(), 3);
1143 }
1144
1145 #[tokio::test]
1146 async fn batch_pipeline_returns_bookmark() {
1147 let source = IncrementalSource {
1148 records: vec![json!({"id": 1, "ts": "2024-12-01"})],
1149 bookmark: json!("2024-12-01"),
1150 };
1151 let sink = MockSink::new();
1152
1153 let result = Pipeline::new(&source, &sink).run().await.unwrap();
1154
1155 assert_eq!(result.records_written, 1);
1156 assert_eq!(result.bookmark, Some(json!("2024-12-01")));
1157 }
1158
1159 #[tokio::test]
1160 async fn batch_pipeline_empty_source() {
1161 let source = MockSource(vec![]);
1162 let sink = MockSink::new();
1163
1164 let result = Pipeline::new(&source, &sink).run().await.unwrap();
1165
1166 assert_eq!(result.records_written, 0);
1167 assert!(sink.written().is_empty());
1168 }
1169
1170 #[tokio::test]
1171 async fn batch_pipeline_source_error_propagates() {
1172 let source = FailingSource;
1173 let sink = MockSink::new();
1174
1175 let result = Pipeline::new(&source, &sink).run().await;
1176 assert!(result.is_err());
1177 assert!(sink.written().is_empty());
1178 }
1179
1180 #[tokio::test]
1181 async fn batch_pipeline_sink_error_propagates() {
1182 let source = MockSource(vec![json!({"id": 1})]);
1183 let sink = FailingSink;
1184
1185 let result = Pipeline::new(&source, &sink).run().await;
1186 assert!(result.is_err());
1187 }
1188
1189 #[tokio::test]
1190 async fn batch_pipeline_with_trait_objects() {
1191 let source: Box<dyn Source> = Box::new(MockSource(vec![json!({"id": 1})]));
1192 let sink: Box<dyn Sink> = Box::new(MockSink::new());
1193
1194 let result = Pipeline::new(source.as_ref(), sink.as_ref())
1195 .run()
1196 .await
1197 .unwrap();
1198
1199 assert_eq!(result.records_written, 1);
1200 }
1201
1202 #[tokio::test]
1205 async fn stream_pipeline_writes_pages() {
1206 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1207 Ok(StreamPage {
1208 records: vec![json!({"id": 1}), json!({"id": 2})],
1209 bookmark: None,
1210 }),
1211 Ok(StreamPage {
1212 records: vec![json!({"id": 3})],
1213 bookmark: None,
1214 }),
1215 ];
1216 let stream = futures::stream::iter(pages);
1217 let sink = MockSink::new();
1218
1219 let result = run_stream(stream, &sink, RunStreamOptions::new())
1220 .await
1221 .unwrap();
1222
1223 assert_eq!(result.records_written, 3);
1224 assert!(result.bookmark.is_none());
1225 assert_eq!(sink.written().len(), 3);
1226 }
1227
1228 #[tokio::test]
1229 async fn stream_pipeline_flushes_sink_on_source_error() {
1230 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1234 Ok(StreamPage {
1235 records: vec![json!({"id": 1}), json!({"id": 2})],
1236 bookmark: None,
1237 }),
1238 Err(FaucetError::Source("transient blip mid-stream".into())),
1239 ];
1240 let stream = futures::stream::iter(pages);
1241 let sink = FlushTrackingSink::new();
1242
1243 let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1244
1245 assert!(matches!(result, Err(FaucetError::Source(_))));
1247 assert_eq!(sink.written().len(), 2);
1249 assert!(
1251 sink.flush_count() >= 1,
1252 "sink must be flushed on the error path so partial output is durable"
1253 );
1254 }
1255
1256 #[tokio::test]
1257 async fn stream_pipeline_flushes_sink_on_cancel() {
1258 use tokio_util::sync::CancellationToken;
1263
1264 let stream = Box::pin(async_stream::stream! {
1266 yield Ok(StreamPage {
1267 records: vec![json!({"id": 1}), json!({"id": 2})],
1268 bookmark: None,
1269 });
1270 futures::future::pending::<()>().await;
1271 });
1272 let sink = FlushTrackingSink::new();
1273
1274 let token = CancellationToken::new();
1275 let canceller = token.clone();
1276 tokio::spawn(async move {
1277 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1278 canceller.cancel();
1279 });
1280
1281 let result = run_stream(stream, &sink, RunStreamOptions::new().with_cancel(token))
1282 .await
1283 .expect("a cooperative cancel returns Ok with the partial result");
1284
1285 assert_eq!(result.records_written, 2);
1288 assert_eq!(sink.written().len(), 2);
1289 assert!(
1290 sink.flush_count() >= 1,
1291 "sink must be flushed on the cancel path so partial output is durable"
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn stream_pipeline_empty() {
1297 let pages: Vec<Result<StreamPage, FaucetError>> = vec![];
1298 let stream = futures::stream::iter(pages);
1299 let sink = MockSink::new();
1300
1301 let result = run_stream(stream, &sink, RunStreamOptions::new())
1302 .await
1303 .unwrap();
1304
1305 assert_eq!(result.records_written, 0);
1306 }
1307
1308 #[tokio::test]
1309 async fn stream_pipeline_skips_empty_pages() {
1310 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1311 Ok(StreamPage {
1312 records: vec![json!({"id": 1})],
1313 bookmark: None,
1314 }),
1315 Ok(StreamPage {
1316 records: vec![],
1317 bookmark: None,
1318 }),
1319 Ok(StreamPage {
1320 records: vec![json!({"id": 2})],
1321 bookmark: None,
1322 }),
1323 ];
1324 let stream = futures::stream::iter(pages);
1325 let sink = MockSink::new();
1326
1327 let result = run_stream(stream, &sink, RunStreamOptions::new())
1328 .await
1329 .unwrap();
1330
1331 assert_eq!(result.records_written, 2);
1332 }
1333
1334 #[tokio::test]
1335 async fn stream_pipeline_error_in_page_propagates() {
1336 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1337 Ok(StreamPage {
1338 records: vec![json!({"id": 1})],
1339 bookmark: None,
1340 }),
1341 Err(FaucetError::HttpStatus {
1342 status: 500,
1343 url: "https://example.com".into(),
1344 body: "Internal Server Error".into(),
1345 }),
1346 ];
1347 let stream = futures::stream::iter(pages);
1348 let sink = MockSink::new();
1349
1350 let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1351 assert!(result.is_err());
1352 assert_eq!(sink.written().len(), 1);
1354 }
1355
1356 #[tokio::test]
1357 async fn stream_pipeline_sink_error_propagates() {
1358 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1359 records: vec![json!({"id": 1})],
1360 bookmark: None,
1361 })];
1362 let stream = futures::stream::iter(pages);
1363 let sink = FailingSink;
1364
1365 let result = run_stream(stream, &sink, RunStreamOptions::new()).await;
1366 assert!(result.is_err());
1367 }
1368
1369 #[tokio::test]
1370 async fn stream_pipeline_with_trait_object_sink() {
1371 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1372 records: vec![json!({"id": 1})],
1373 bookmark: None,
1374 })];
1375 let stream = futures::stream::iter(pages);
1376 let sink: Box<dyn Sink> = Box::new(MockSink::new());
1377
1378 let result = run_stream(stream, sink.as_ref(), RunStreamOptions::new())
1379 .await
1380 .unwrap();
1381 assert_eq!(result.records_written, 1);
1382 }
1383
1384 #[tokio::test]
1385 async fn stream_pipeline_persists_bookmark_when_page_carries_one() {
1386 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1387 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1388 Ok(StreamPage {
1389 records: vec![json!({"id": 1})],
1390 bookmark: None,
1391 }),
1392 Ok(StreamPage {
1393 records: vec![json!({"id": 2})],
1394 bookmark: Some(json!("checkpoint-final")),
1395 }),
1396 ];
1397 let stream = futures::stream::iter(pages);
1398 let sink = MockSink::new();
1399
1400 let result = run_stream(
1401 stream,
1402 &sink,
1403 RunStreamOptions::new().with_state(Arc::clone(&store), "k"),
1404 )
1405 .await
1406 .unwrap();
1407
1408 assert_eq!(result.records_written, 2);
1409 assert_eq!(result.bookmark, Some(json!("checkpoint-final")));
1410 assert_eq!(
1411 store.get("k").await.unwrap(),
1412 Some(json!("checkpoint-final"))
1413 );
1414 }
1415
1416 #[tokio::test]
1417 async fn stream_pipeline_persists_per_page_bookmarks() {
1418 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1419 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
1420 Ok(StreamPage {
1421 records: vec![json!({"id": 1})],
1422 bookmark: Some(json!("tx-1")),
1423 }),
1424 Ok(StreamPage {
1425 records: vec![json!({"id": 2})],
1426 bookmark: Some(json!("tx-2")),
1427 }),
1428 ];
1429 let stream = futures::stream::iter(pages);
1430 let sink = MockSink::new();
1431
1432 run_stream(
1433 stream,
1434 &sink,
1435 RunStreamOptions::new().with_state(Arc::clone(&store), "k"),
1436 )
1437 .await
1438 .unwrap();
1439
1440 assert_eq!(store.get("k").await.unwrap(), Some(json!("tx-2")));
1442 }
1443
1444 use crate::state::{FileStateStore, MemoryStateStore, StateStore};
1447 use std::sync::Arc;
1448 use tempfile::TempDir;
1449
1450 struct StatefulSource {
1454 key: String,
1455 records: Vec<Value>,
1456 new_bookmark: Value,
1457 seen_bookmark: std::sync::Mutex<Option<Value>>,
1458 }
1459
1460 impl StatefulSource {
1461 fn new(key: &str, records: Vec<Value>, new_bookmark: Value) -> Self {
1462 Self {
1463 key: key.into(),
1464 records,
1465 new_bookmark,
1466 seen_bookmark: std::sync::Mutex::new(None),
1467 }
1468 }
1469 fn observed_start(&self) -> Option<Value> {
1470 self.seen_bookmark.lock().unwrap().clone()
1471 }
1472 }
1473
1474 #[async_trait]
1475 impl Source for StatefulSource {
1476 async fn fetch_with_context(
1477 &self,
1478 _ctx: &std::collections::HashMap<String, Value>,
1479 ) -> Result<Vec<Value>, FaucetError> {
1480 Ok(self.records.clone())
1481 }
1482 async fn fetch_with_context_incremental(
1483 &self,
1484 _ctx: &std::collections::HashMap<String, Value>,
1485 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
1486 Ok((self.records.clone(), Some(self.new_bookmark.clone())))
1487 }
1488 fn state_key(&self) -> Option<String> {
1489 Some(self.key.clone())
1490 }
1491 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
1492 *self.seen_bookmark.lock().unwrap() = Some(bookmark);
1493 Ok(())
1494 }
1495 }
1496
1497 #[tokio::test]
1498 async fn pipeline_with_state_store_persists_bookmark_after_sink() {
1499 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1500 let source = StatefulSource::new(
1501 "github_issues",
1502 vec![json!({"id": 1, "ts": "2026-05-01"})],
1503 json!("2026-05-01"),
1504 );
1505 let sink = MockSink::new();
1506 let result = Pipeline::new(&source, &sink)
1507 .with_state_store(Arc::clone(&store))
1508 .run()
1509 .await
1510 .unwrap();
1511
1512 assert_eq!(result.records_written, 1);
1513 assert_eq!(result.bookmark, Some(json!("2026-05-01")));
1514 let stored = store.get("github_issues").await.unwrap();
1516 assert_eq!(stored, Some(json!("2026-05-01")));
1517 }
1518
1519 #[tokio::test]
1520 async fn pipeline_with_state_store_resumes_from_stored_bookmark() {
1521 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1522 store
1523 .put("github_issues", &json!("2026-04-30"))
1524 .await
1525 .unwrap();
1526
1527 let source =
1528 StatefulSource::new("github_issues", vec![json!({"id": 2})], json!("2026-05-01"));
1529 let sink = MockSink::new();
1530 Pipeline::new(&source, &sink)
1531 .with_state_store(Arc::clone(&store))
1532 .run()
1533 .await
1534 .unwrap();
1535
1536 assert_eq!(source.observed_start(), Some(json!("2026-04-30")));
1538 assert_eq!(
1540 store.get("github_issues").await.unwrap(),
1541 Some(json!("2026-05-01"))
1542 );
1543 }
1544
1545 #[tokio::test]
1546 async fn pipeline_with_state_store_does_not_persist_when_sink_fails() {
1547 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1548 let source = StatefulSource::new("k", vec![json!({"id": 1})], json!("2026-05-01"));
1549 let sink = FailingSink;
1550
1551 let result = Pipeline::new(&source, &sink)
1552 .with_state_store(Arc::clone(&store))
1553 .run()
1554 .await;
1555 assert!(result.is_err());
1556 assert!(store.get("k").await.unwrap().is_none());
1557 }
1558
1559 #[tokio::test]
1560 async fn pipeline_with_state_store_no_state_key_means_no_persist() {
1561 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1562 let source = IncrementalSource {
1563 records: vec![json!({"id": 1})],
1564 bookmark: json!("ignored"),
1565 };
1566 let sink = MockSink::new();
1567 Pipeline::new(&source, &sink)
1568 .with_state_store(Arc::clone(&store))
1569 .run()
1570 .await
1571 .unwrap();
1572 assert!(store.get("anything").await.unwrap().is_none());
1575 }
1576
1577 #[tokio::test]
1578 async fn pipeline_with_state_store_skips_persist_when_bookmark_is_none() {
1579 let store: Arc<dyn StateStore> = Arc::new(MemoryStateStore::new());
1580 struct NoBookmarkSource;
1581 #[async_trait]
1582 impl Source for NoBookmarkSource {
1583 async fn fetch_with_context(
1584 &self,
1585 _ctx: &std::collections::HashMap<String, Value>,
1586 ) -> Result<Vec<Value>, FaucetError> {
1587 Ok(vec![json!({"id": 1})])
1588 }
1589 fn state_key(&self) -> Option<String> {
1590 Some("k".into())
1591 }
1592 }
1593 let source = NoBookmarkSource;
1594 let sink = MockSink::new();
1595 Pipeline::new(&source, &sink)
1596 .with_state_store(Arc::clone(&store))
1597 .run()
1598 .await
1599 .unwrap();
1600 assert!(store.get("k").await.unwrap().is_none());
1601 }
1602
1603 struct PagedSource;
1608
1609 #[async_trait]
1610 impl Source for PagedSource {
1611 async fn fetch_with_context(
1612 &self,
1613 _ctx: &std::collections::HashMap<String, Value>,
1614 ) -> Result<Vec<Value>, FaucetError> {
1615 unreachable!("Pipeline::run must drive stream_pages, not fetch_with_context");
1617 }
1618 fn stream_pages<'a>(
1619 &'a self,
1620 _ctx: &'a std::collections::HashMap<String, Value>,
1621 _batch_size: usize,
1622 ) -> std::pin::Pin<
1623 Box<dyn futures_core::Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>,
1624 > {
1625 Box::pin(async_stream::try_stream! {
1626 yield StreamPage { records: vec![json!({"i": 1})], bookmark: None };
1627 yield StreamPage { records: vec![json!({"i": 2})], bookmark: None };
1628 yield StreamPage { records: vec![json!({"i": 3})], bookmark: Some(json!("final")) };
1629 })
1630 }
1631 }
1632
1633 struct CountingSink {
1635 calls: std::sync::Mutex<Vec<usize>>,
1636 }
1637
1638 impl CountingSink {
1639 fn new() -> Self {
1640 Self {
1641 calls: std::sync::Mutex::new(Vec::new()),
1642 }
1643 }
1644 fn call_count(&self) -> usize {
1645 self.calls.lock().unwrap().len()
1646 }
1647 }
1648
1649 #[async_trait]
1650 impl Sink for CountingSink {
1651 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
1652 self.calls.lock().unwrap().push(records.len());
1653 Ok(records.len())
1654 }
1655 }
1656
1657 #[tokio::test]
1658 async fn pipeline_run_drives_stream_pages() {
1659 let source = PagedSource;
1660 let sink = CountingSink::new();
1661
1662 let result = Pipeline::new(&source, &sink).run().await.unwrap();
1663
1664 assert_eq!(sink.call_count(), 3);
1666 assert_eq!(result.records_written, 3);
1667 assert_eq!(result.bookmark, Some(json!("final")));
1668 }
1669
1670 #[tokio::test]
1671 async fn pipeline_with_file_state_store_round_trips_across_runs() {
1672 let dir = TempDir::new().unwrap();
1673 let store: Arc<dyn StateStore> = Arc::new(FileStateStore::new(dir.path()));
1674
1675 let s1 = StatefulSource::new("k", vec![json!({"i": 1})], json!("v1"));
1677 let sink1 = MockSink::new();
1678 Pipeline::new(&s1, &sink1)
1679 .with_state_store(Arc::clone(&store))
1680 .run()
1681 .await
1682 .unwrap();
1683 assert_eq!(s1.observed_start(), None);
1684 assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
1685
1686 let s2 = StatefulSource::new("k", vec![json!({"i": 2})], json!("v2"));
1688 let sink2 = MockSink::new();
1689 Pipeline::new(&s2, &sink2)
1690 .with_state_store(Arc::clone(&store))
1691 .run()
1692 .await
1693 .unwrap();
1694 assert_eq!(s2.observed_start(), Some(json!("v1")));
1695 assert_eq!(store.get("k").await.unwrap(), Some(json!("v2")));
1696 }
1697
1698 #[tokio::test]
1699 #[allow(clippy::await_holding_lock)]
1700 async fn pipeline_run_increments_runs_total() {
1701 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1702 use metrics_util::debugging::DebugValue;
1703 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1704 let snap = snapshotter();
1705
1706 let source = MockSource(vec![json!({"i": 1})]);
1707 let sink = MockSink::new();
1708 let _ = Pipeline::new(&source, &sink)
1709 .with_name("test-pipeline")
1710 .with_row("rowA")
1711 .run()
1712 .await
1713 .unwrap();
1714
1715 let snapshot = snap.snapshot();
1716 let found = snapshot.into_vec().into_iter().any(
1717 |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1718 key.key().name() == "faucet_pipeline_runs_total"
1719 && key.key().labels().any(|l: &metrics::Label| {
1720 l.key() == "pipeline" && l.value() == "test-pipeline"
1721 })
1722 && key
1723 .key()
1724 .labels()
1725 .any(|l: &metrics::Label| l.key() == "row" && l.value() == "rowA")
1726 && key
1727 .key()
1728 .labels()
1729 .any(|l: &metrics::Label| l.key() == "status" && l.value() == "ok")
1730 && matches!(v, DebugValue::Counter(c) if c >= 1)
1731 },
1732 );
1733 assert!(
1734 found,
1735 "expected faucet_pipeline_runs_total{{pipeline=test-pipeline, row=rowA, status=ok}}"
1736 );
1737 }
1738
1739 #[tokio::test]
1740 #[allow(clippy::await_holding_lock)]
1741 async fn pipeline_failure_attaches_kind_label_to_runs_total() {
1742 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1743 use metrics_util::debugging::DebugValue;
1744 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1745 let snap = snapshotter();
1746
1747 let source = FailingSource;
1748 let sink = MockSink::new();
1749 let _ = Pipeline::new(&source, &sink)
1750 .with_name("err-pipeline")
1751 .with_row("rowE")
1752 .run()
1753 .await;
1754
1755 let snapshot = snap.snapshot();
1756 let found = snapshot.into_vec().into_iter().any(
1757 |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1758 key.key().name() == "faucet_pipeline_runs_total"
1759 && key.key().labels().any(|l: &metrics::Label| {
1760 l.key() == "pipeline" && l.value() == "err-pipeline"
1761 })
1762 && key
1763 .key()
1764 .labels()
1765 .any(|l: &metrics::Label| l.key() == "status" && l.value() == "err")
1766 && key
1767 .key()
1768 .labels()
1769 .any(|l: &metrics::Label| l.key() == "kind" && l.value() == "Auth")
1770 && matches!(v, DebugValue::Counter(c) if c >= 1)
1771 },
1772 );
1773 assert!(
1774 found,
1775 "expected faucet_pipeline_runs_total{{status=err, kind=Auth}} for failing source"
1776 );
1777 }
1778
1779 #[tokio::test]
1780 #[allow(clippy::await_holding_lock)]
1781 async fn pipeline_run_emits_start_time_gauge() {
1782 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1783 use metrics_util::debugging::DebugValue;
1784 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1785 let snap = snapshotter();
1786
1787 let source = MockSource(vec![json!({"i": 1})]);
1788 let sink = MockSink::new();
1789 let before = std::time::SystemTime::now()
1790 .duration_since(std::time::UNIX_EPOCH)
1791 .map(|d| d.as_secs_f64())
1792 .unwrap_or(0.0);
1793 let _ = Pipeline::new(&source, &sink)
1794 .with_name("start-time-pipeline")
1795 .with_row("rowS")
1796 .run()
1797 .await
1798 .unwrap();
1799
1800 let snapshot = snap.snapshot();
1801 let found = snapshot.into_vec().into_iter().any(
1802 |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1803 if key.key().name() != "faucet_pipeline_start_time_unix_seconds" {
1804 return false;
1805 }
1806 let labels_match = key.key().labels().any(|l: &metrics::Label| {
1807 l.key() == "pipeline" && l.value() == "start-time-pipeline"
1808 }) && key
1809 .key()
1810 .labels()
1811 .any(|l: &metrics::Label| l.key() == "row" && l.value() == "rowS");
1812 if !labels_match {
1813 return false;
1814 }
1815 matches!(v, DebugValue::Gauge(g) if g.into_inner() >= before)
1816 },
1817 );
1818 assert!(
1819 found,
1820 "expected faucet_pipeline_start_time_unix_seconds gauge >= test-start timestamp"
1821 );
1822 }
1823
1824 #[tokio::test]
1825 #[allow(clippy::await_holding_lock)]
1826 async fn register_build_info_sets_version_gauge() {
1827 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
1828 use metrics_util::debugging::DebugValue;
1829 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
1830 let snap = snapshotter();
1831
1832 crate::observability::register_build_info();
1833
1834 let snapshot = snap.snapshot();
1835 let found = snapshot.into_vec().into_iter().any(
1836 |(key, _u, _d, v): (metrics_util::CompositeKey, _, _, _)| {
1837 key.key().name() == "faucet_build_info"
1838 && key.key().labels().any(|l: &metrics::Label| {
1839 l.key() == "version" && l.value() == env!("CARGO_PKG_VERSION")
1840 })
1841 && matches!(v, DebugValue::Gauge(g) if (g.into_inner() - 1.0).abs() < f64::EPSILON)
1842 },
1843 );
1844 assert!(
1845 found,
1846 "expected faucet_build_info{{version=CARGO_PKG_VERSION}} = 1.0 after register_build_info()"
1847 );
1848 }
1849
1850 use crate::dlq::{DlqConfig, OnBatchError};
1853
1854 struct PartialSink {
1858 fail_indices: std::sync::Mutex<Vec<usize>>,
1859 committed: std::sync::Mutex<Vec<Value>>,
1860 }
1861
1862 impl PartialSink {
1863 fn new(fail_indices: Vec<usize>) -> Self {
1864 Self {
1865 fail_indices: std::sync::Mutex::new(fail_indices),
1866 committed: std::sync::Mutex::new(Vec::new()),
1867 }
1868 }
1869 }
1870
1871 #[async_trait]
1872 impl Sink for PartialSink {
1873 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
1874 unreachable!("PartialSink only overrides write_batch_partial");
1875 }
1876 async fn write_batch_partial(
1877 &self,
1878 records: &[Value],
1879 ) -> Result<Vec<crate::traits::RowOutcome>, FaucetError> {
1880 let fails: std::collections::HashSet<usize> =
1881 self.fail_indices.lock().unwrap().iter().copied().collect();
1882 let mut outcomes = Vec::with_capacity(records.len());
1883 for (i, rec) in records.iter().enumerate() {
1884 if fails.contains(&i) {
1885 outcomes.push(Err(FaucetError::Sink(format!("row {i} rejected"))));
1886 } else {
1887 self.committed.lock().unwrap().push(rec.clone());
1888 outcomes.push(Ok(()));
1889 }
1890 }
1891 Ok(outcomes)
1892 }
1893 }
1894
1895 #[tokio::test]
1896 async fn dlq_routes_only_failed_rows_for_partial_success_sink() {
1897 let main = PartialSink::new(vec![1, 3]); let dlq = std::sync::Arc::new(MockSink::new());
1899 let dlq_cfg = DlqConfig::new(dlq.clone());
1900
1901 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1902 records: (0..4).map(|i| json!({"i": i})).collect(),
1903 bookmark: None,
1904 })];
1905 let stream = futures::stream::iter(pages);
1906 let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg))
1907 .await
1908 .unwrap();
1909
1910 assert_eq!(result.records_written, 2); assert_eq!(main.committed.lock().unwrap().len(), 2);
1912 let envelopes = dlq.0.lock().unwrap();
1913 assert_eq!(envelopes.len(), 2);
1914 assert_eq!(envelopes[0]["payload"]["i"], 1);
1915 assert_eq!(envelopes[0]["record_index"], 1);
1916 assert_eq!(envelopes[1]["payload"]["i"], 3);
1917 assert_eq!(envelopes[1]["record_index"], 3);
1918 let stats = result.dlq.unwrap();
1919 assert_eq!(stats.records_dlq, 2);
1920 assert_eq!(stats.pages_with_failures, 1);
1921 }
1922
1923 #[tokio::test]
1924 async fn dlq_propagate_policy_bubbles_outer_err() {
1925 let main = FailingSink;
1926 let dlq = std::sync::Arc::new(MockSink::new());
1927 let mut dlq_cfg = DlqConfig::new(dlq.clone());
1928 dlq_cfg.on_batch_error = OnBatchError::Propagate;
1929
1930 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1931 records: vec![json!({"i": 0}), json!({"i": 1})],
1932 bookmark: Some(json!("v1")),
1933 })];
1934 let stream = futures::stream::iter(pages);
1935 let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
1936 let result = run_stream(
1937 stream,
1938 &main,
1939 RunStreamOptions::new()
1940 .with_dlq(dlq_cfg)
1941 .with_state(std::sync::Arc::clone(&store), "k"),
1942 )
1943 .await;
1944 assert!(matches!(result, Err(FaucetError::Sink(_))));
1945 assert!(dlq.0.lock().unwrap().is_empty());
1946 assert!(store.get("k").await.unwrap().is_none());
1948 }
1949
1950 #[tokio::test]
1951 async fn dlq_dlq_all_policy_routes_every_row_on_outer_err() {
1952 let main = FailingSink;
1953 let dlq = std::sync::Arc::new(MockSink::new());
1954 let mut dlq_cfg = DlqConfig::new(dlq.clone());
1955 dlq_cfg.on_batch_error = OnBatchError::DlqAll;
1956
1957 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1958 records: vec![json!({"i": 0}), json!({"i": 1}), json!({"i": 2})],
1959 bookmark: Some(json!("v1")),
1960 })];
1961 let stream = futures::stream::iter(pages);
1962 let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
1963 let result = run_stream(
1964 stream,
1965 &main,
1966 RunStreamOptions::new()
1967 .with_dlq(dlq_cfg)
1968 .with_state(std::sync::Arc::clone(&store), "k"),
1969 )
1970 .await
1971 .unwrap();
1972 assert_eq!(result.records_written, 0);
1973 {
1974 let envelopes = dlq.0.lock().unwrap();
1975 assert_eq!(envelopes.len(), 3);
1976 for env in envelopes.iter() {
1978 let msg = env["error"]["message"].as_str().unwrap();
1979 assert!(msg.contains("write failed"), "got: {msg}");
1980 }
1981 }
1982 assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
1983 assert_eq!(result.dlq.unwrap().records_dlq, 3);
1984 }
1985
1986 #[tokio::test]
1987 async fn dlq_per_page_budget_exceeded_aborts() {
1988 let main = PartialSink::new(vec![0, 1, 2]);
1989 let dlq = std::sync::Arc::new(MockSink::new());
1990 let mut dlq_cfg = DlqConfig::new(dlq.clone());
1991 dlq_cfg.max_failures_per_page = Some(2);
1992
1993 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
1994 records: (0..3).map(|i| json!({"i": i})).collect(),
1995 bookmark: None,
1996 })];
1997 let stream = futures::stream::iter(pages);
1998 let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
1999 assert!(
2000 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("per-page budget exceeded")),
2001 "got: {result:?}"
2002 );
2003 }
2004
2005 #[tokio::test]
2006 async fn dlq_total_budget_exceeded_aborts_on_later_page() {
2007 let pages: Vec<Result<StreamPage, FaucetError>> = vec![
2008 Ok(StreamPage {
2009 records: (0..3).map(|i| json!({"i": i})).collect(),
2010 bookmark: None,
2011 }),
2012 Ok(StreamPage {
2013 records: (3..6).map(|i| json!({"i": i})).collect(),
2014 bookmark: None,
2015 }),
2016 ];
2017 let main = PartialSink::new(vec![0, 1, 2]); let dlq = std::sync::Arc::new(MockSink::new());
2020 let mut dlq_cfg = DlqConfig::new(dlq.clone());
2021 dlq_cfg.max_failures_total = Some(4);
2022
2023 let stream = futures::stream::iter(pages);
2024 let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
2025 assert!(
2026 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("total budget exceeded")),
2027 "got: {result:?}"
2028 );
2029 }
2030
2031 #[tokio::test]
2032 async fn dlq_per_page_budget_exceeded_commits_page_before_aborting() {
2033 let main = PartialSink::new(vec![1, 2]); let dlq = std::sync::Arc::new(MockSink::new());
2040 let mut dlq_cfg = DlqConfig::new(dlq.clone());
2041 dlq_cfg.max_failures_per_page = Some(1); let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2044 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2045 records: (0..3).map(|i| json!({ "i": i })).collect(),
2046 bookmark: Some(json!("v1")),
2047 })];
2048 let stream = futures::stream::iter(pages);
2049 let result = run_stream(
2050 stream,
2051 &main,
2052 RunStreamOptions::new()
2053 .with_dlq(dlq_cfg)
2054 .with_state(std::sync::Arc::clone(&store), "k"),
2055 )
2056 .await;
2057
2058 assert!(
2060 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("per-page budget exceeded")),
2061 "got: {result:?}"
2062 );
2063 assert_eq!(main.committed.lock().unwrap().len(), 1);
2065 assert_eq!(dlq.0.lock().unwrap().len(), 2);
2067 assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2069 }
2070
2071 #[tokio::test]
2072 async fn dlq_total_budget_exceeded_commits_tripping_page_before_aborting() {
2073 let main = PartialSink::new(vec![1, 2]); let dlq = std::sync::Arc::new(MockSink::new());
2078 let mut dlq_cfg = DlqConfig::new(dlq.clone());
2079 dlq_cfg.max_failures_total = Some(1); let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2082 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2083 records: (0..3).map(|i| json!({ "i": i })).collect(),
2084 bookmark: Some(json!("v1")),
2085 })];
2086 let stream = futures::stream::iter(pages);
2087 let result = run_stream(
2088 stream,
2089 &main,
2090 RunStreamOptions::new()
2091 .with_dlq(dlq_cfg)
2092 .with_state(std::sync::Arc::clone(&store), "k"),
2093 )
2094 .await;
2095
2096 assert!(
2097 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("total budget exceeded")),
2098 "got: {result:?}"
2099 );
2100 assert_eq!(main.committed.lock().unwrap().len(), 1);
2101 assert_eq!(dlq.0.lock().unwrap().len(), 2);
2102 assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2103 }
2104
2105 struct FailingDlqSink;
2108 #[async_trait]
2109 impl Sink for FailingDlqSink {
2110 async fn write_batch(&self, _records: &[Value]) -> Result<usize, FaucetError> {
2111 Err(FaucetError::Sink("dlq disk full".into()))
2112 }
2113 }
2114
2115 struct FailingFlushDlqSink {
2119 written: std::sync::Mutex<Vec<Value>>,
2120 }
2121 impl FailingFlushDlqSink {
2122 fn new() -> Self {
2123 Self {
2124 written: std::sync::Mutex::new(Vec::new()),
2125 }
2126 }
2127 }
2128 #[async_trait]
2129 impl Sink for FailingFlushDlqSink {
2130 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2131 self.written.lock().unwrap().extend(records.iter().cloned());
2132 Ok(records.len())
2133 }
2134 async fn flush(&self) -> Result<(), FaucetError> {
2135 Err(FaucetError::Sink("dlq flush failed".into()))
2136 }
2137 }
2138
2139 #[tokio::test]
2140 async fn dlq_sink_failure_is_fatal_no_recursion() {
2141 let main = PartialSink::new(vec![0]);
2142 let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingDlqSink);
2143 let dlq_cfg = DlqConfig::new(dlq);
2144
2145 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2146 records: vec![json!({"i": 0}), json!({"i": 1})],
2147 bookmark: Some(json!("v1")),
2148 })];
2149 let stream = futures::stream::iter(pages);
2150 let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2151 let result = run_stream(
2152 stream,
2153 &main,
2154 RunStreamOptions::new()
2155 .with_dlq(dlq_cfg)
2156 .with_state(std::sync::Arc::clone(&store), "k"),
2157 )
2158 .await;
2159 assert!(
2160 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink write failed")),
2161 "got: {result:?}"
2162 );
2163 assert!(store.get("k").await.unwrap().is_none());
2164 }
2165
2166 #[tokio::test]
2167 async fn dlq_bookmark_advances_only_after_both_flushes() {
2168 let main = PartialSink::new(vec![1]); let dlq = std::sync::Arc::new(MockSink::new());
2170 let dlq_cfg = DlqConfig::new(dlq.clone());
2171
2172 let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2173 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2174 records: vec![json!({"i": 0}), json!({"i": 1})],
2175 bookmark: Some(json!("v1")),
2176 })];
2177 let stream = futures::stream::iter(pages);
2178 run_stream(
2179 stream,
2180 &main,
2181 RunStreamOptions::new()
2182 .with_dlq(dlq_cfg)
2183 .with_state(std::sync::Arc::clone(&store), "k"),
2184 )
2185 .await
2186 .unwrap();
2187 assert_eq!(store.get("k").await.unwrap(), Some(json!("v1")));
2188 assert_eq!(dlq.0.lock().unwrap().len(), 1);
2189 assert_eq!(main.committed.lock().unwrap().len(), 1);
2190 }
2191
2192 #[tokio::test]
2193 async fn dlq_disabled_pipeline_behaves_identically_to_today() {
2194 let main = MockSink::new();
2196 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2197 records: vec![json!({"i": 0}), json!({"i": 1})],
2198 bookmark: None,
2199 })];
2200 let stream = futures::stream::iter(pages);
2201 let result = run_stream(stream, &main, RunStreamOptions::new())
2202 .await
2203 .unwrap();
2204 assert_eq!(result.records_written, 2);
2205 assert!(result.dlq.is_none());
2206 }
2207
2208 #[tokio::test]
2209 async fn dlq_per_page_flush_failure_is_fatal_and_blocks_bookmark() {
2210 let main = PartialSink::new(vec![1]);
2215 let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingFlushDlqSink::new());
2216 let dlq_cfg = DlqConfig::new(dlq);
2217
2218 let store: std::sync::Arc<dyn StateStore> = std::sync::Arc::new(MemoryStateStore::new());
2219 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2220 records: vec![json!({"i": 0}), json!({"i": 1})],
2221 bookmark: Some(json!("v1")),
2222 })];
2223 let stream = futures::stream::iter(pages);
2224 let result = run_stream(
2225 stream,
2226 &main,
2227 RunStreamOptions::new()
2228 .with_dlq(dlq_cfg)
2229 .with_state(std::sync::Arc::clone(&store), "k"),
2230 )
2231 .await;
2232 assert!(
2233 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink flush failed")),
2234 "got: {result:?}"
2235 );
2236 assert!(store.get("k").await.unwrap().is_none());
2237 }
2238
2239 #[tokio::test]
2240 async fn dlq_end_of_stream_flush_failure_is_fatal() {
2241 let main = PartialSink::new(vec![1]);
2246 let dlq: std::sync::Arc<dyn Sink> = std::sync::Arc::new(FailingFlushDlqSink::new());
2247 let dlq_cfg = DlqConfig::new(dlq);
2248
2249 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2250 records: vec![json!({"i": 0}), json!({"i": 1})],
2251 bookmark: None,
2252 })];
2253 let stream = futures::stream::iter(pages);
2254 let result = run_stream(stream, &main, RunStreamOptions::new().with_dlq(dlq_cfg)).await;
2255 assert!(
2256 matches!(&result, Err(FaucetError::Sink(m)) if m.contains("DLQ sink flush failed")),
2257 "got: {result:?}"
2258 );
2259 }
2260
2261 #[tokio::test]
2262 #[allow(clippy::await_holding_lock)]
2263 async fn dlq_emits_records_total_and_pages_total() {
2264 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2265 use metrics_util::debugging::DebugValue;
2266
2267 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2268 let snap = snapshotter();
2269
2270 let source = MockSource(vec![json!({"i": 0}), json!({"i": 1})]);
2271 let main = PartialSink::new(vec![1]);
2272 let dlq = std::sync::Arc::new(MockSink::new());
2273 let _ = Pipeline::new(&source, &main)
2274 .with_name("p_dlq_metrics")
2275 .with_row("r1")
2276 .with_dlq(DlqConfig::new(dlq.clone()))
2277 .run()
2278 .await
2279 .unwrap();
2280
2281 let snapshot = snap.snapshot();
2282 let mut saw_records = false;
2283 let mut saw_pages = false;
2284 for (k, _u, _d, v) in snapshot.into_vec() {
2285 let key = k.key();
2286 let labels = key.labels().collect::<Vec<_>>();
2287 let has = |k: &str, v: &str| labels.iter().any(|l| l.key() == k && l.value() == v);
2288 if key.name() == "faucet_sink_dlq_records_total"
2289 && has("pipeline", "p_dlq_metrics")
2290 && has("row", "r1")
2291 && matches!(v, DebugValue::Counter(c) if c >= 1)
2292 {
2293 saw_records = true;
2294 }
2295 if key.name() == "faucet_sink_dlq_pages_total"
2296 && has("pipeline", "p_dlq_metrics")
2297 && matches!(v, DebugValue::Counter(c) if c >= 1)
2298 {
2299 saw_pages = true;
2300 }
2301 }
2302 assert!(saw_records, "faucet_sink_dlq_records_total not emitted");
2303 assert!(saw_pages, "faucet_sink_dlq_pages_total not emitted");
2304 }
2305
2306 #[tokio::test]
2307 #[allow(clippy::await_holding_lock)]
2308 async fn dlq_budget_exceeded_emits_counter() {
2309 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2310 use metrics_util::debugging::DebugValue;
2311
2312 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2313 let snap = snapshotter();
2314
2315 let source = MockSource((0..3).map(|i| json!({"i": i})).collect());
2316 let main = PartialSink::new(vec![0, 1, 2]);
2317 let dlq = std::sync::Arc::new(MockSink::new());
2318 let mut cfg = DlqConfig::new(dlq);
2319 cfg.max_failures_per_page = Some(1);
2320 let _ = Pipeline::new(&source, &main)
2321 .with_name("p_budget")
2322 .with_dlq(cfg)
2323 .run()
2324 .await;
2325
2326 let snapshot = snap.snapshot();
2327 let saw = snapshot.into_vec().into_iter().any(|(k, _, _, v)| {
2328 k.key().name() == "faucet_sink_dlq_budget_exceeded_total"
2329 && k.key()
2330 .labels()
2331 .any(|l| l.key() == "scope" && l.value() == "per_page")
2332 && matches!(v, DebugValue::Counter(c) if c >= 1)
2333 });
2334 assert!(saw, "faucet_sink_dlq_budget_exceeded_total not emitted");
2335 }
2336
2337 #[tokio::test]
2338 async fn pipeline_run_with_dlq_routes_partial_failures_end_to_end() {
2339 let source = MockSource(vec![json!({"i": 0}), json!({"i": 1}), json!({"i": 2})]);
2341 let main = PartialSink::new(vec![1]);
2342 let dlq = std::sync::Arc::new(MockSink::new());
2343
2344 let result = Pipeline::new(&source, &main)
2345 .with_dlq(DlqConfig::new(dlq.clone()))
2346 .run()
2347 .await
2348 .unwrap();
2349
2350 assert_eq!(result.records_written, 2);
2351 let stats = result.dlq.unwrap();
2352 assert_eq!(stats.records_dlq, 1);
2353 {
2354 let dlq_records = dlq.0.lock().unwrap();
2355 assert_eq!(dlq_records.len(), 1);
2356 }
2357 }
2358
2359 #[cfg(feature = "quality")]
2362 #[tokio::test]
2363 async fn quality_quarantines_to_dlq_and_writes_survivors() {
2364 use crate::dlq::DlqConfig;
2365 use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2366
2367 let main = Arc::new(MockSink::new());
2368 let dlq_sink = Arc::new(MockSink::new());
2369 let spec = QualitySpec {
2370 record: vec![RecordCheck::NotNull {
2371 field: "id".into(),
2372 treat_missing_as_null: true,
2373 on_failure: OnFailure::Quarantine,
2374 }],
2375 batch: vec![],
2376 };
2377 let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2378 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2379 records: vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})],
2380 bookmark: None,
2381 })];
2382 let opts = RunStreamOptions::new()
2383 .with_dlq(DlqConfig::new(dlq_sink.clone()))
2384 .with_quality(quality);
2385 let result = run_stream(futures::stream::iter(pages), main.as_ref(), opts)
2386 .await
2387 .unwrap();
2388
2389 assert_eq!(result.records_written, 2); assert_eq!(main.written(), vec![json!({"id": 1}), json!({"id": 3})]);
2391 let dlq = dlq_sink.written();
2393 assert_eq!(dlq.len(), 1);
2394 assert_eq!(dlq[0]["error"]["kind"], "QualityFailure");
2395 assert_eq!(result.dlq.unwrap().records_dlq, 1);
2396 }
2397
2398 #[cfg(feature = "quality")]
2399 #[tokio::test]
2400 #[allow(clippy::await_holding_lock)]
2401 async fn quality_only_page_emits_quality_reason() {
2402 use crate::dlq::DlqConfig;
2406 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2407 use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2408 use metrics_util::debugging::DebugValue;
2409
2410 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2411 let snap = snapshotter();
2412
2413 let main = Arc::new(MockSink::new());
2416 let dlq_sink = Arc::new(MockSink::new());
2417 let spec = QualitySpec {
2418 record: vec![RecordCheck::NotNull {
2419 field: "id".into(),
2420 treat_missing_as_null: true,
2421 on_failure: OnFailure::Quarantine,
2422 }],
2423 batch: vec![],
2424 };
2425 let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2426 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2427 records: vec![json!({"id": 1}), json!({"id": null}), json!({"id": 3})],
2428 bookmark: None,
2429 })];
2430 let opts = RunStreamOptions::new()
2431 .with_name("p_quality_reason")
2432 .with_dlq(DlqConfig::new(dlq_sink.clone()))
2433 .with_quality(quality);
2434 let _ = run_stream(futures::stream::iter(pages), main.as_ref(), opts)
2435 .await
2436 .unwrap();
2437
2438 let snapshot = snap.snapshot();
2439 let saw_quality_reason = snapshot.into_vec().into_iter().any(|(k, _, _, v)| {
2440 k.key().name() == "faucet_sink_dlq_pages_total"
2441 && k.key()
2442 .labels()
2443 .any(|l| l.key() == "pipeline" && l.value() == "p_quality_reason")
2444 && k.key()
2445 .labels()
2446 .any(|l| l.key() == "reason" && l.value() == "quality")
2447 && matches!(v, DebugValue::Counter(c) if c >= 1)
2448 });
2449 assert!(
2450 saw_quality_reason,
2451 "expected faucet_sink_dlq_pages_total with reason=\"quality\""
2452 );
2453 }
2454
2455 #[cfg(feature = "quality")]
2456 #[tokio::test]
2457 async fn quality_abort_fails_run() {
2458 use crate::quality::{BatchCheck, CompiledQuality, OnFailure, QualitySpec};
2459 let main = MockSink::new();
2460 let spec = QualitySpec {
2461 record: vec![],
2462 batch: vec![BatchCheck::RowCount {
2463 min: Some(5),
2464 max: None,
2465 on_failure: OnFailure::Abort,
2466 }],
2467 };
2468 let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2469 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2470 records: vec![json!({"id": 1})],
2471 bookmark: None,
2472 })];
2473 let opts = RunStreamOptions::new().with_quality(quality);
2474 let result = run_stream(futures::stream::iter(pages), &main, opts).await;
2475 assert!(matches!(result, Err(FaucetError::QualityFailure { .. })));
2476 }
2477
2478 #[cfg(feature = "quality")]
2479 #[tokio::test]
2480 async fn quality_quarantine_without_dlq_is_rejected() {
2481 use crate::quality::{CompiledQuality, OnFailure, QualitySpec, RecordCheck};
2482 let main = MockSink::new();
2483 let spec = QualitySpec {
2484 record: vec![RecordCheck::NotNull {
2485 field: "id".into(),
2486 treat_missing_as_null: true,
2487 on_failure: OnFailure::Quarantine,
2488 }],
2489 batch: vec![],
2490 };
2491 let quality = Arc::new(CompiledQuality::compile(&spec).unwrap());
2492 let pages: Vec<Result<StreamPage, FaucetError>> = vec![Ok(StreamPage {
2493 records: vec![json!({"id": null})],
2494 bookmark: None,
2495 })];
2496 let opts = RunStreamOptions::new().with_quality(quality);
2498 let result = run_stream(futures::stream::iter(pages), &main, opts).await;
2499 assert!(matches!(result, Err(FaucetError::Config(_))));
2500 }
2501
2502 struct FlakySink {
2505 every: usize,
2506 calls: std::sync::Mutex<Vec<usize>>,
2507 }
2508 impl FlakySink {
2509 fn new(every: usize) -> Self {
2510 Self {
2511 every,
2512 calls: std::sync::Mutex::new(Vec::new()),
2513 }
2514 }
2515 fn call_sizes(&self) -> Vec<usize> {
2516 self.calls.lock().unwrap().clone()
2517 }
2518 }
2519 #[async_trait]
2520 impl Sink for FlakySink {
2521 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2522 Ok(records.len())
2523 }
2524 async fn write_batch_partial(
2525 &self,
2526 records: &[Value],
2527 ) -> Result<Vec<crate::RowOutcome>, FaucetError> {
2528 self.calls.lock().unwrap().push(records.len());
2529 Ok(records
2530 .iter()
2531 .enumerate()
2532 .map(|(i, _)| {
2533 if (i + 1) % self.every == 0 {
2534 Err(FaucetError::Sink("synthetic".into()))
2535 } else {
2536 Ok(())
2537 }
2538 })
2539 .collect())
2540 }
2541 }
2542
2543 #[tokio::test]
2544 async fn adaptive_shrinks_under_errors_on_dlq_path() {
2545 use crate::adaptive::AdaptiveBatchConfig;
2546 use crate::dlq::{DlqConfig, OnBatchError};
2547 let mk = || StreamPage {
2552 records: (0..400).map(|i| json!({"i": i})).collect(),
2553 bookmark: None,
2554 };
2555 let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2556 let sink = FlakySink::new(4); let dlq_sink: Arc<dyn Sink> = Arc::new(MockSink::new());
2558 let dlq = DlqConfig {
2559 sink: dlq_sink,
2560 on_batch_error: OnBatchError::Propagate,
2561 max_failures_per_page: None,
2562 max_failures_total: None,
2563 include_original_payload: true,
2564 };
2565 let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2566 "enabled": true, "min": 50, "max": 400,
2567 "decrease_factor": 0.5, "cooldown_batches": 0, "error_threshold": 0.1
2568 }))
2569 .unwrap();
2570 let opts = RunStreamOptions::new().with_dlq(dlq).with_adaptive(cfg);
2571 let result = run_stream(stream, &sink, opts).await.unwrap();
2572 assert!(
2579 result.records_written >= 900,
2580 "expected ≥900 written, got {}",
2581 result.records_written
2582 );
2583 let sizes = sink.call_sizes();
2584 assert_eq!(sizes[0], 400, "first chunk is the full page");
2585 assert!(
2586 sizes.last().unwrap() < &400,
2587 "controller should shrink under errors: {sizes:?}"
2588 );
2589 assert!(
2590 result.dlq.unwrap().records_dlq >= 250,
2591 "expected ≥250 DLQ records"
2592 );
2593 }
2594
2595 struct RecordingSink {
2600 calls: std::sync::Mutex<Vec<usize>>,
2601 latency: std::time::Duration,
2602 }
2603 impl RecordingSink {
2604 fn new(latency_ms: u64) -> Self {
2605 Self {
2606 calls: std::sync::Mutex::new(Vec::new()),
2607 latency: std::time::Duration::from_millis(latency_ms),
2608 }
2609 }
2610 fn call_sizes(&self) -> Vec<usize> {
2611 self.calls.lock().unwrap().clone()
2612 }
2613 }
2614 #[async_trait]
2615 impl Sink for RecordingSink {
2616 async fn write_batch(&self, records: &[Value]) -> Result<usize, FaucetError> {
2617 tokio::time::sleep(self.latency).await;
2618 self.calls.lock().unwrap().push(records.len());
2619 Ok(records.len())
2620 }
2621 }
2622
2623 #[tokio::test]
2624 async fn adaptive_reslices_non_dlq_page_into_subbatches() {
2625 use crate::adaptive::AdaptiveBatchConfig;
2626 let page = StreamPage {
2627 records: (0..1000).map(|i| json!({ "i": i })).collect(),
2628 bookmark: None,
2629 };
2630 let stream = futures::stream::iter(vec![Ok(page)]);
2631 let sink = RecordingSink::new(0);
2632 let cfg: AdaptiveBatchConfig =
2633 serde_json::from_value(json!({"enabled": true, "min": 100, "max": 1000})).unwrap();
2634 let result = run_stream(stream, &sink, RunStreamOptions::new().with_adaptive(cfg))
2635 .await
2636 .unwrap();
2637 assert_eq!(result.records_written, 1000);
2638 assert_eq!(sink.call_sizes(), vec![1000]);
2640 }
2641
2642 #[tokio::test]
2643 async fn adaptive_shrinks_under_latency_target_then_smaller_chunks() {
2644 use crate::adaptive::AdaptiveBatchConfig;
2645 let mk = || StreamPage {
2646 records: (0..400).map(|i| json!({"i": i})).collect(),
2647 bookmark: None,
2648 };
2649 let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2650 let sink = RecordingSink::new(50);
2651 let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2652 "enabled": true, "min": 50, "max": 400,
2653 "decrease_factor": 0.5, "cooldown_batches": 0,
2654 "target_latency_ms": 10, "latency_window": 1
2655 }))
2656 .unwrap();
2657 let result = run_stream(stream, &sink, RunStreamOptions::new().with_adaptive(cfg))
2658 .await
2659 .unwrap();
2660 assert_eq!(result.records_written, 1200);
2661 let sizes = sink.call_sizes();
2662 assert_eq!(sizes[0], 400);
2663 assert!(
2664 sizes.last().unwrap() < &400,
2665 "controller should have shrunk: {sizes:?}"
2666 );
2667 }
2668
2669 #[tokio::test]
2670 #[allow(clippy::await_holding_lock)]
2671 async fn adaptive_emits_batch_size_and_adjustments_metrics() {
2672 use crate::adaptive::AdaptiveBatchConfig;
2675 use crate::observability::decorator::source_tests::{LOCK, snapshotter};
2676 use metrics_util::debugging::DebugValue;
2677
2678 let _g = LOCK.lock().unwrap_or_else(|e| e.into_inner());
2679 let snap = snapshotter();
2680
2681 let mk = || StreamPage {
2687 records: (0..400).map(|i| json!({"i": i})).collect(),
2688 bookmark: None,
2689 };
2690 let stream = futures::stream::iter(vec![Ok(mk()), Ok(mk()), Ok(mk())]);
2691 let sink = RecordingSink::new(50);
2692 let cfg: AdaptiveBatchConfig = serde_json::from_value(json!({
2693 "enabled": true, "min": 50, "max": 400,
2694 "decrease_factor": 0.5, "cooldown_batches": 0,
2695 "target_latency_ms": 10, "latency_window": 1
2696 }))
2697 .unwrap();
2698
2699 let _ = run_stream(
2700 stream,
2701 &sink,
2702 RunStreamOptions::new()
2703 .with_adaptive(cfg)
2704 .with_name("p")
2705 .with_row("r"),
2706 )
2707 .await
2708 .unwrap();
2709
2710 let snapshot = snap.snapshot();
2711 let mut saw_batch_size = false;
2712 let mut saw_adjustments = false;
2713 for (k, _u, _d, v) in snapshot.into_vec() {
2714 let key = k.key();
2715 let labels = key.labels().collect::<Vec<_>>();
2716 let has = |k: &str, val: &str| labels.iter().any(|l| l.key() == k && l.value() == val);
2717
2718 if key.name() == "faucet_pipeline_adaptive_batch_size"
2719 && has("pipeline", "p")
2720 && has("row", "r")
2721 && matches!(v, DebugValue::Gauge(_))
2722 {
2723 saw_batch_size = true;
2724 }
2725 if key.name() == "faucet_pipeline_adaptive_batch_adjustments_total"
2726 && has("pipeline", "p")
2727 && has("row", "r")
2728 && matches!(v, DebugValue::Counter(c) if c >= 1)
2729 {
2730 saw_adjustments = true;
2731 }
2732 }
2733 assert!(
2734 saw_batch_size,
2735 "expected faucet_pipeline_adaptive_batch_size gauge with pipeline=p, row=r"
2736 );
2737 assert!(
2738 saw_adjustments,
2739 "expected faucet_pipeline_adaptive_batch_adjustments_total counter with pipeline=p, row=r"
2740 );
2741 }
2742}