1use crate::{
2 integrations::{
3 map_integration_result, run_with_contract, AdapterCallContract, BigQueryAdapter,
4 ClickHouseAdapter, DataWindowRequest, OpenSearchAdapter, QueryContext, RetryPolicy,
5 SearchRequest, SingleStoreAdapter, SqlCommand,
6 },
7 DataError, DataResult, Row, WireFormatProfile,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::{
12 collections::{BTreeMap, HashMap, HashSet},
13 fs::{self, OpenOptions},
14 io::{BufRead, BufReader, Write},
15 path::{Path, PathBuf},
16 sync::{
17 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
18 Arc, Mutex,
19 },
20 thread,
21 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
22};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum PipelineTrigger {
27 Manual,
28 Scheduled,
29 EventDriven,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
33pub struct Pipeline {
34 pub id: String,
35 pub name: String,
36 pub source: String,
37 pub transform: String,
38 pub sink: String,
39 pub trigger: PipelineTrigger,
40 pub batch_size: usize,
41 pub max_batches_per_run: Option<usize>,
42 pub deadline_ms: u64,
43 pub retry_policy: RetryPolicy,
44 pub metadata: BTreeMap<String, String>,
45}
46
47impl Pipeline {
48 pub fn new(
49 id: impl Into<String>,
50 name: impl Into<String>,
51 source: impl Into<String>,
52 transform: impl Into<String>,
53 sink: impl Into<String>,
54 ) -> Self {
55 Self {
56 id: id.into(),
57 name: name.into(),
58 source: source.into(),
59 transform: transform.into(),
60 sink: sink.into(),
61 trigger: PipelineTrigger::Manual,
62 batch_size: 500,
63 max_batches_per_run: None,
64 deadline_ms: 30_000,
65 retry_policy: RetryPolicy::conservative(),
66 metadata: BTreeMap::new(),
67 }
68 }
69
70 pub fn with_trigger(mut self, trigger: PipelineTrigger) -> Self {
71 self.trigger = trigger;
72 self
73 }
74
75 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
76 self.batch_size = batch_size.max(1);
77 self
78 }
79
80 pub fn with_max_batches_per_run(mut self, max_batches_per_run: Option<usize>) -> Self {
81 self.max_batches_per_run = max_batches_per_run.map(|value| value.max(1));
82 self
83 }
84
85 pub fn with_deadline_ms(mut self, deadline_ms: u64) -> Self {
86 self.deadline_ms = deadline_ms.max(1);
87 self
88 }
89
90 pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
91 self.retry_policy = RetryPolicy {
92 max_attempts: retry_policy.max_attempts.max(1),
93 initial_backoff_ms: retry_policy.initial_backoff_ms,
94 max_backoff_ms: retry_policy
95 .max_backoff_ms
96 .max(retry_policy.initial_backoff_ms),
97 };
98 self
99 }
100
101 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
102 self.metadata.insert(key.into(), value.into());
103 self
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
108#[serde(tag = "kind", rename_all = "snake_case")]
109pub enum PipelineRunCommand {
110 Manual { requested_by: Option<String> },
111 Scheduled { schedule_id: String },
112 Event { event: String, payload: Value },
113}
114
115impl PipelineRunCommand {
116 fn as_label(&self) -> &'static str {
117 match self {
118 Self::Manual { .. } => "manual",
119 Self::Scheduled { .. } => "scheduled",
120 Self::Event { .. } => "event",
121 }
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub enum PipelineRunStatus {
128 Queued,
129 Running,
130 Paused,
131 Succeeded,
132 Failed,
133 Canceled,
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
137#[serde(rename_all = "snake_case")]
138pub enum PipelineStage {
139 Source,
140 Transform,
141 Sink,
142}
143
144#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
145pub struct PipelineRun {
146 pub id: String,
147 pub pipeline_id: String,
148 pub command: PipelineRunCommand,
149 pub status: PipelineRunStatus,
150 pub tenant_id: Option<String>,
151 pub trace_id: Option<String>,
152 pub started_at_unix_ms: u64,
153 pub finished_at_unix_ms: Option<u64>,
154 pub resumed_from_checkpoint: Option<String>,
155 pub last_checkpoint: Option<String>,
156 pub rows_read: u64,
157 pub rows_transformed: u64,
158 pub rows_written: u64,
159 pub rows_dead_letter: u64,
160 pub duplicate_writes: u64,
161 pub retries: u64,
162 pub latest_lineage: Option<String>,
163 pub latest_freshness_lag_ms: Option<u64>,
164 pub errors: Vec<String>,
165}
166
167impl PipelineRun {
168 fn new(
169 id: String,
170 pipeline_id: String,
171 command: PipelineRunCommand,
172 context: &QueryContext,
173 ) -> Self {
174 Self {
175 id,
176 pipeline_id,
177 command,
178 status: PipelineRunStatus::Queued,
179 tenant_id: context.tenant_id.clone(),
180 trace_id: context.trace_id.clone(),
181 started_at_unix_ms: now_unix_ms(),
182 finished_at_unix_ms: None,
183 resumed_from_checkpoint: None,
184 last_checkpoint: None,
185 rows_read: 0,
186 rows_transformed: 0,
187 rows_written: 0,
188 rows_dead_letter: 0,
189 duplicate_writes: 0,
190 retries: 0,
191 latest_lineage: None,
192 latest_freshness_lag_ms: None,
193 errors: Vec::new(),
194 }
195 }
196
197 fn finish(&mut self, status: PipelineRunStatus) {
198 self.status = status;
199 self.finished_at_unix_ms = Some(now_unix_ms());
200 }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "snake_case")]
205pub enum PipelineEventKind {
206 RunQueued,
207 RunStarted,
208 RunResumed,
209 BatchRead,
210 BatchTransformed,
211 BatchWritten,
212 CheckpointSaved,
213 Retrying,
214 DeadLettered,
215 RunPaused,
216 RunCanceled,
217 RunSucceeded,
218 RunFailed,
219}
220
221#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
222pub struct PipelineEvent {
223 pub run_id: String,
224 pub pipeline_id: String,
225 pub timestamp_unix_ms: u64,
226 pub kind: PipelineEventKind,
227 pub message: String,
228 pub tenant_id: Option<String>,
229 pub trace_id: Option<String>,
230 pub correlation_id: Option<String>,
231 pub request_id: Option<String>,
232 pub metrics: BTreeMap<String, u64>,
233 pub metadata: BTreeMap<String, String>,
234}
235
236#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct PipelineBatch {
238 pub records: Vec<Row>,
239 pub next_checkpoint: Option<String>,
240 pub has_more: bool,
241 pub lineage: Option<String>,
242 pub source_freshness_unix_ms: Option<u64>,
243}
244
245impl PipelineBatch {
246 pub fn empty() -> Self {
247 Self {
248 records: Vec::new(),
249 next_checkpoint: None,
250 has_more: false,
251 lineage: None,
252 source_freshness_unix_ms: None,
253 }
254 }
255}
256
257#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
258pub struct PipelineDeadLetterInput {
259 pub stage: PipelineStage,
260 pub reason: String,
261 pub record: Row,
262}
263
264#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
265pub struct TransformBatch {
266 pub records: Vec<Row>,
267 pub dead_letters: Vec<PipelineDeadLetterInput>,
268}
269
270impl TransformBatch {
271 pub fn passthrough(records: Vec<Row>) -> Self {
272 Self {
273 records,
274 dead_letters: Vec::new(),
275 }
276 }
277}
278
279#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
280pub struct SinkWriteResult {
281 pub written: usize,
282 pub duplicate_writes: usize,
283 pub dead_letters: Vec<PipelineDeadLetterInput>,
284}
285
286#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
287pub struct PipelineDeadLetter {
288 pub run_id: String,
289 pub pipeline_id: String,
290 pub stage: PipelineStage,
291 pub reason: String,
292 pub record: Row,
293 pub checkpoint: Option<String>,
294 pub attempt: u32,
295 pub timestamp_unix_ms: u64,
296}
297
298pub trait Source: Send + Sync {
299 fn source_name(&self) -> &str;
300 fn read_batch(
301 &self,
302 pipeline: &Pipeline,
303 checkpoint: Option<&str>,
304 context: &QueryContext,
305 ) -> DataResult<PipelineBatch>;
306}
307
308pub trait Transform: Send + Sync {
309 fn transform_name(&self) -> &str;
310 fn transform_batch(
311 &self,
312 pipeline: &Pipeline,
313 run: &PipelineRun,
314 records: Vec<Row>,
315 context: &QueryContext,
316 ) -> DataResult<TransformBatch>;
317}
318
319#[derive(Debug, Clone, Copy, Default)]
320pub struct PassthroughTransform;
321
322impl Transform for PassthroughTransform {
323 fn transform_name(&self) -> &str {
324 "passthrough"
325 }
326
327 fn transform_batch(
328 &self,
329 _pipeline: &Pipeline,
330 _run: &PipelineRun,
331 records: Vec<Row>,
332 _context: &QueryContext,
333 ) -> DataResult<TransformBatch> {
334 Ok(TransformBatch::passthrough(records))
335 }
336}
337
338pub trait Sink: Send + Sync {
339 fn sink_name(&self) -> &str;
340 fn write_batch(
341 &self,
342 pipeline: &Pipeline,
343 run: &PipelineRun,
344 records: &[Row],
345 context: &QueryContext,
346 ) -> DataResult<SinkWriteResult>;
347}
348
349pub trait CheckpointStore: Send + Sync {
350 fn load_checkpoint(
351 &self,
352 pipeline_id: &str,
353 tenant_id: Option<&str>,
354 ) -> DataResult<Option<String>>;
355 fn save_checkpoint(
356 &self,
357 pipeline_id: &str,
358 tenant_id: Option<&str>,
359 checkpoint: &str,
360 ) -> DataResult<()>;
361 fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()>;
362}
363
364pub trait DeadLetterStore: Send + Sync {
365 fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()>;
366 fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>>;
367}
368
369pub trait PipelineEventStore: Send + Sync {
370 fn append_event(&self, event: PipelineEvent) -> DataResult<()>;
371 fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>>;
372}
373
374#[derive(Debug, Clone, Default)]
375pub struct InMemoryCheckpointStore {
376 inner: Arc<Mutex<HashMap<String, String>>>,
377}
378
379impl InMemoryCheckpointStore {
380 fn key(pipeline_id: &str, tenant_id: Option<&str>) -> String {
381 format!("{}::{}", tenant_id.unwrap_or("-"), pipeline_id)
382 }
383}
384
385impl CheckpointStore for InMemoryCheckpointStore {
386 fn load_checkpoint(
387 &self,
388 pipeline_id: &str,
389 tenant_id: Option<&str>,
390 ) -> DataResult<Option<String>> {
391 let key = Self::key(pipeline_id, tenant_id);
392 let guard = self
393 .inner
394 .lock()
395 .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
396 Ok(guard.get(&key).cloned())
397 }
398
399 fn save_checkpoint(
400 &self,
401 pipeline_id: &str,
402 tenant_id: Option<&str>,
403 checkpoint: &str,
404 ) -> DataResult<()> {
405 let key = Self::key(pipeline_id, tenant_id);
406 let mut guard = self
407 .inner
408 .lock()
409 .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
410 guard.insert(key, checkpoint.to_string());
411 Ok(())
412 }
413
414 fn clear_checkpoint(&self, pipeline_id: &str, tenant_id: Option<&str>) -> DataResult<()> {
415 let key = Self::key(pipeline_id, tenant_id);
416 let mut guard = self
417 .inner
418 .lock()
419 .map_err(|_| DataError::Integration("checkpoint store lock poisoned".to_string()))?;
420 guard.remove(&key);
421 Ok(())
422 }
423}
424
425#[derive(Debug, Clone, Default)]
426pub struct InMemoryDeadLetterStore {
427 inner: Arc<Mutex<Vec<PipelineDeadLetter>>>,
428}
429
430impl DeadLetterStore for InMemoryDeadLetterStore {
431 fn push_dead_letter(&self, dead_letter: PipelineDeadLetter) -> DataResult<()> {
432 let mut guard = self
433 .inner
434 .lock()
435 .map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
436 guard.push(dead_letter);
437 Ok(())
438 }
439
440 fn list_dead_letters(&self, run_id: &str) -> DataResult<Vec<PipelineDeadLetter>> {
441 let guard = self
442 .inner
443 .lock()
444 .map_err(|_| DataError::Integration("dead-letter store lock poisoned".to_string()))?;
445 Ok(guard
446 .iter()
447 .filter(|entry| entry.run_id == run_id)
448 .cloned()
449 .collect())
450 }
451}
452
453#[derive(Debug, Clone, Default)]
454pub struct InMemoryPipelineEventStore {
455 inner: Arc<Mutex<Vec<PipelineEvent>>>,
456}
457
458impl PipelineEventStore for InMemoryPipelineEventStore {
459 fn append_event(&self, event: PipelineEvent) -> DataResult<()> {
460 let mut guard = self.inner.lock().map_err(|_| {
461 DataError::Integration("pipeline event store lock poisoned".to_string())
462 })?;
463 guard.push(event);
464 Ok(())
465 }
466
467 fn list_events(&self, run_id: &str) -> DataResult<Vec<PipelineEvent>> {
468 let guard = self.inner.lock().map_err(|_| {
469 DataError::Integration("pipeline event store lock poisoned".to_string())
470 })?;
471 Ok(guard
472 .iter()
473 .filter(|event| event.run_id == run_id)
474 .cloned()
475 .collect())
476 }
477}
478
479#[derive(Debug, Clone, Default)]
480pub struct PipelineControl {
481 paused: Arc<AtomicBool>,
482 canceled: Arc<AtomicBool>,
483}
484
485impl PipelineControl {
486 pub fn pause(&self) {
487 self.paused.store(true, Ordering::SeqCst);
488 }
489
490 pub fn resume(&self) {
491 self.paused.store(false, Ordering::SeqCst);
492 }
493
494 pub fn cancel(&self) {
495 self.canceled.store(true, Ordering::SeqCst);
496 }
497
498 pub fn is_paused(&self) -> bool {
499 self.paused.load(Ordering::SeqCst)
500 }
501
502 pub fn is_canceled(&self) -> bool {
503 self.canceled.load(Ordering::SeqCst)
504 }
505}
506
507pub struct PipelineRuntime {
508 pipeline: Pipeline,
509 source: Arc<dyn Source>,
510 transform: Arc<dyn Transform>,
511 sink: Arc<dyn Sink>,
512 checkpoint_store: Arc<dyn CheckpointStore>,
513 dead_letter_store: Arc<dyn DeadLetterStore>,
514 event_store: Arc<dyn PipelineEventStore>,
515}
516
517impl PipelineRuntime {
518 pub fn new(
519 pipeline: Pipeline,
520 source: Arc<dyn Source>,
521 transform: Arc<dyn Transform>,
522 sink: Arc<dyn Sink>,
523 checkpoint_store: Arc<dyn CheckpointStore>,
524 dead_letter_store: Arc<dyn DeadLetterStore>,
525 event_store: Arc<dyn PipelineEventStore>,
526 ) -> Self {
527 Self {
528 pipeline,
529 source,
530 transform,
531 sink,
532 checkpoint_store,
533 dead_letter_store,
534 event_store,
535 }
536 }
537
538 pub fn execute_manual(
539 &self,
540 context: QueryContext,
541 control: PipelineControl,
542 requested_by: Option<String>,
543 ) -> DataResult<PipelineRun> {
544 self.execute(
545 PipelineRunCommand::Manual { requested_by },
546 context,
547 control,
548 )
549 }
550
551 pub fn execute_scheduled(
552 &self,
553 context: QueryContext,
554 control: PipelineControl,
555 schedule_id: impl Into<String>,
556 ) -> DataResult<PipelineRun> {
557 self.execute(
558 PipelineRunCommand::Scheduled {
559 schedule_id: schedule_id.into(),
560 },
561 context,
562 control,
563 )
564 }
565
566 pub fn execute_event(
567 &self,
568 context: QueryContext,
569 control: PipelineControl,
570 event: impl Into<String>,
571 payload: Value,
572 ) -> DataResult<PipelineRun> {
573 self.execute(
574 PipelineRunCommand::Event {
575 event: event.into(),
576 payload,
577 },
578 context,
579 control,
580 )
581 }
582
583 pub fn execute(
584 &self,
585 command: PipelineRunCommand,
586 context: QueryContext,
587 control: PipelineControl,
588 ) -> DataResult<PipelineRun> {
589 let started = Instant::now();
590 let run_id = format!("plr-{}-{}", now_unix_ms(), next_counter());
591 let mut run = PipelineRun::new(
592 run_id.clone(),
593 self.pipeline.id.clone(),
594 command.clone(),
595 &context,
596 );
597 self.emit_event(
598 &run,
599 &context,
600 PipelineEventKind::RunQueued,
601 "pipeline run queued",
602 BTreeMap::new(),
603 BTreeMap::new(),
604 )?;
605
606 run.status = PipelineRunStatus::Running;
607 self.emit_event(
608 &run,
609 &context,
610 PipelineEventKind::RunStarted,
611 "pipeline run started",
612 BTreeMap::new(),
613 BTreeMap::from([("command".to_string(), command.as_label().to_string())]),
614 )?;
615
616 let tenant_id = context.tenant_id.as_deref();
617 let mut checkpoint = self
618 .checkpoint_store
619 .load_checkpoint(&self.pipeline.id, tenant_id)?;
620 if checkpoint.is_some() {
621 run.resumed_from_checkpoint = checkpoint.clone();
622 self.emit_event(
623 &run,
624 &context,
625 PipelineEventKind::RunResumed,
626 "resuming from stored checkpoint",
627 BTreeMap::new(),
628 BTreeMap::from([(
629 "checkpoint".to_string(),
630 checkpoint.clone().unwrap_or_default(),
631 )]),
632 )?;
633 }
634
635 let mut batches_processed = 0usize;
636 loop {
637 if control.is_canceled() {
638 run.finish(PipelineRunStatus::Canceled);
639 self.emit_event(
640 &run,
641 &context,
642 PipelineEventKind::RunCanceled,
643 "pipeline run canceled by operator control",
644 BTreeMap::new(),
645 BTreeMap::new(),
646 )?;
647 break;
648 }
649 if control.is_paused() {
650 run.finish(PipelineRunStatus::Paused);
651 self.emit_event(
652 &run,
653 &context,
654 PipelineEventKind::RunPaused,
655 "pipeline run paused by operator control",
656 BTreeMap::new(),
657 BTreeMap::new(),
658 )?;
659 break;
660 }
661 if started.elapsed() > Duration::from_millis(self.pipeline.deadline_ms.max(1)) {
662 let message = format!(
663 "pipeline run deadline exceeded ({}ms)",
664 self.pipeline.deadline_ms
665 );
666 run.errors.push(message.clone());
667 run.finish(PipelineRunStatus::Failed);
668 self.emit_event(
669 &run,
670 &context,
671 PipelineEventKind::RunFailed,
672 &message,
673 BTreeMap::new(),
674 BTreeMap::new(),
675 )?;
676 break;
677 }
678 if let Some(max_batches) = self.pipeline.max_batches_per_run {
679 if batches_processed >= max_batches {
680 run.finish(PipelineRunStatus::Paused);
681 self.emit_event(
682 &run,
683 &context,
684 PipelineEventKind::RunPaused,
685 "pipeline run paused at max_batches_per_run budget",
686 BTreeMap::from([("max_batches".to_string(), max_batches as u64)]),
687 BTreeMap::new(),
688 )?;
689 break;
690 }
691 }
692
693 let source_batch =
694 match self.with_retry(PipelineStage::Source, &context, &mut run, |attempt| {
695 self.source
696 .read_batch(&self.pipeline, checkpoint.as_deref(), &context)
697 .map_err(|err| {
698 DataError::Integration(format!(
699 "source read failed at attempt {}: {}",
700 attempt, err
701 ))
702 })
703 }) {
704 Ok(batch) => batch,
705 Err(err) => {
706 let message = err.to_string();
707 run.errors.push(message.clone());
708 run.finish(PipelineRunStatus::Failed);
709 self.emit_event(
710 &run,
711 &context,
712 PipelineEventKind::RunFailed,
713 &message,
714 BTreeMap::new(),
715 BTreeMap::new(),
716 )?;
717 break;
718 }
719 };
720
721 let read_count = source_batch.records.len() as u64;
722 run.rows_read = run.rows_read.saturating_add(read_count);
723 if let Some(lineage) = source_batch.lineage.as_ref() {
724 run.latest_lineage = Some(lineage.clone());
725 }
726 if let Some(source_freshness) = source_batch.source_freshness_unix_ms {
727 run.latest_freshness_lag_ms = Some(now_unix_ms().saturating_sub(source_freshness));
728 }
729 self.emit_event(
730 &run,
731 &context,
732 PipelineEventKind::BatchRead,
733 "source batch read complete",
734 BTreeMap::from([("rows".to_string(), read_count)]),
735 BTreeMap::from([
736 (
737 "checkpoint".to_string(),
738 checkpoint.clone().unwrap_or_else(|| "none".to_string()),
739 ),
740 ("has_more".to_string(), source_batch.has_more.to_string()),
741 ]),
742 )?;
743
744 if source_batch.records.is_empty() && !source_batch.has_more {
745 run.finish(PipelineRunStatus::Succeeded);
746 self.emit_event(
747 &run,
748 &context,
749 PipelineEventKind::RunSucceeded,
750 "pipeline run completed with no additional source records",
751 BTreeMap::new(),
752 BTreeMap::new(),
753 )?;
754 break;
755 }
756
757 let run_snapshot = run.clone();
758 let transformed =
759 match self.with_retry(PipelineStage::Transform, &context, &mut run, |_attempt| {
760 self.transform.transform_batch(
761 &self.pipeline,
762 &run_snapshot,
763 source_batch.records.clone(),
764 &context,
765 )
766 }) {
767 Ok(batch) => batch,
768 Err(err) => {
769 let reason = err.to_string();
770 for record in source_batch.records {
771 self.push_dead_letter(
772 &mut run,
773 PipelineDeadLetterInput {
774 stage: PipelineStage::Transform,
775 reason: reason.clone(),
776 record,
777 },
778 checkpoint.clone(),
779 1,
780 )?;
781 }
782 run.errors.push(reason.clone());
783 if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
784 self.checkpoint_store.save_checkpoint(
785 &self.pipeline.id,
786 tenant_id,
787 &next_checkpoint,
788 )?;
789 run.last_checkpoint = Some(next_checkpoint.clone());
790 checkpoint = Some(next_checkpoint.clone());
791 self.emit_event(
792 &run,
793 &context,
794 PipelineEventKind::CheckpointSaved,
795 "checkpoint advanced after transform dead-letter fallback",
796 BTreeMap::new(),
797 BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
798 )?;
799 }
800 if source_batch.has_more {
801 batches_processed += 1;
802 continue;
803 }
804 run.finish(PipelineRunStatus::Failed);
805 self.emit_event(
806 &run,
807 &context,
808 PipelineEventKind::RunFailed,
809 "transform stage exhausted and no additional source batches remain",
810 BTreeMap::new(),
811 BTreeMap::new(),
812 )?;
813 break;
814 }
815 };
816
817 let transformed_count = transformed.records.len() as u64;
818 run.rows_transformed = run.rows_transformed.saturating_add(transformed_count);
819 self.emit_event(
820 &run,
821 &context,
822 PipelineEventKind::BatchTransformed,
823 "transform batch complete",
824 BTreeMap::from([("rows".to_string(), transformed_count)]),
825 BTreeMap::new(),
826 )?;
827
828 for dead_letter in transformed.dead_letters {
829 self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
830 }
831
832 let write_result = if transformed.records.is_empty() {
833 SinkWriteResult::default()
834 } else {
835 let run_snapshot = run.clone();
836 match self.with_retry(PipelineStage::Sink, &context, &mut run, |_attempt| {
837 self.sink.write_batch(
838 &self.pipeline,
839 &run_snapshot,
840 &transformed.records,
841 &context,
842 )
843 }) {
844 Ok(result) => result,
845 Err(err) => {
846 let reason = err.to_string();
847 for record in transformed.records {
848 self.push_dead_letter(
849 &mut run,
850 PipelineDeadLetterInput {
851 stage: PipelineStage::Sink,
852 reason: reason.clone(),
853 record,
854 },
855 checkpoint.clone(),
856 1,
857 )?;
858 }
859 run.errors.push(reason);
860 SinkWriteResult::default()
861 }
862 }
863 };
864
865 run.rows_written = run.rows_written.saturating_add(write_result.written as u64);
866 run.duplicate_writes = run
867 .duplicate_writes
868 .saturating_add(write_result.duplicate_writes as u64);
869 self.emit_event(
870 &run,
871 &context,
872 PipelineEventKind::BatchWritten,
873 "sink batch write complete",
874 BTreeMap::from([
875 ("written".to_string(), write_result.written as u64),
876 (
877 "duplicate_writes".to_string(),
878 write_result.duplicate_writes as u64,
879 ),
880 ]),
881 BTreeMap::new(),
882 )?;
883
884 for dead_letter in write_result.dead_letters {
885 self.push_dead_letter(&mut run, dead_letter, checkpoint.clone(), 1)?;
886 }
887
888 if let Some(next_checkpoint) = source_batch.next_checkpoint.clone() {
889 self.checkpoint_store.save_checkpoint(
890 &self.pipeline.id,
891 tenant_id,
892 &next_checkpoint,
893 )?;
894 run.last_checkpoint = Some(next_checkpoint.clone());
895 checkpoint = Some(next_checkpoint.clone());
896 self.emit_event(
897 &run,
898 &context,
899 PipelineEventKind::CheckpointSaved,
900 "checkpoint saved",
901 BTreeMap::new(),
902 BTreeMap::from([("checkpoint".to_string(), next_checkpoint)]),
903 )?;
904 }
905
906 batches_processed = batches_processed.saturating_add(1);
907
908 if !source_batch.has_more {
909 run.finish(PipelineRunStatus::Succeeded);
910 self.emit_event(
911 &run,
912 &context,
913 PipelineEventKind::RunSucceeded,
914 "pipeline run completed successfully",
915 BTreeMap::new(),
916 BTreeMap::new(),
917 )?;
918 break;
919 }
920 }
921
922 if run.finished_at_unix_ms.is_none() {
923 run.finished_at_unix_ms = Some(now_unix_ms());
924 }
925
926 Ok(run)
927 }
928
929 pub fn operation_snapshot(&self, run: &PipelineRun) -> DataResult<PipelineOperationSnapshot> {
930 let events = self.event_store.list_events(&run.id)?;
931 let dead_letters = self.dead_letter_store.list_dead_letters(&run.id)?;
932 Ok(PipelineOperationSnapshot {
933 run: run.clone(),
934 events,
935 dead_letters,
936 })
937 }
938
939 fn with_retry<T, F>(
940 &self,
941 stage: PipelineStage,
942 context: &QueryContext,
943 run: &mut PipelineRun,
944 mut operation: F,
945 ) -> DataResult<T>
946 where
947 F: FnMut(u32) -> DataResult<T>,
948 {
949 let attempts = self.pipeline.retry_policy.max_attempts.max(1);
950 let mut backoff_ms = self.pipeline.retry_policy.initial_backoff_ms;
951
952 for attempt in 1..=attempts {
953 match operation(attempt) {
954 Ok(value) => return Ok(value),
955 Err(err) if attempt == attempts || !is_retryable_pipeline_error(&err) => {
956 return Err(err);
957 }
958 Err(err) => {
959 run.retries = run.retries.saturating_add(1);
960 self.emit_event(
961 run,
962 context,
963 PipelineEventKind::Retrying,
964 "retrying pipeline stage after retryable error",
965 BTreeMap::from([
966 ("attempt".to_string(), attempt as u64),
967 ("max_attempts".to_string(), attempts as u64),
968 ("backoff_ms".to_string(), backoff_ms),
969 ]),
970 BTreeMap::from([
971 (
972 "stage".to_string(),
973 format!("{stage:?}").to_ascii_lowercase(),
974 ),
975 ("error".to_string(), err.to_string()),
976 ]),
977 )?;
978 backoff_ms = (backoff_ms.saturating_mul(2))
979 .min(self.pipeline.retry_policy.max_backoff_ms.max(1));
980 }
981 }
982 }
983 Err(DataError::Integration(
984 "retry exhausted without terminal error detail".to_string(),
985 ))
986 }
987
988 fn emit_event(
989 &self,
990 run: &PipelineRun,
991 context: &QueryContext,
992 kind: PipelineEventKind,
993 message: &str,
994 metrics: BTreeMap<String, u64>,
995 metadata: BTreeMap<String, String>,
996 ) -> DataResult<()> {
997 self.event_store.append_event(PipelineEvent {
998 run_id: run.id.clone(),
999 pipeline_id: run.pipeline_id.clone(),
1000 timestamp_unix_ms: now_unix_ms(),
1001 kind,
1002 message: message.to_string(),
1003 tenant_id: run.tenant_id.clone(),
1004 trace_id: run.trace_id.clone(),
1005 correlation_id: context.correlation_id().map(ToString::to_string),
1006 request_id: context.request_id().map(ToString::to_string),
1007 metrics,
1008 metadata,
1009 })
1010 }
1011
1012 fn push_dead_letter(
1013 &self,
1014 run: &mut PipelineRun,
1015 input: PipelineDeadLetterInput,
1016 checkpoint: Option<String>,
1017 attempt: u32,
1018 ) -> DataResult<()> {
1019 run.rows_dead_letter = run.rows_dead_letter.saturating_add(1);
1020 self.dead_letter_store.push_dead_letter(PipelineDeadLetter {
1021 run_id: run.id.clone(),
1022 pipeline_id: run.pipeline_id.clone(),
1023 stage: input.stage,
1024 reason: input.reason,
1025 record: input.record,
1026 checkpoint,
1027 attempt,
1028 timestamp_unix_ms: now_unix_ms(),
1029 })
1030 }
1031}
1032
1033#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1034pub struct PipelineOperationSnapshot {
1035 pub run: PipelineRun,
1036 pub events: Vec<PipelineEvent>,
1037 pub dead_letters: Vec<PipelineDeadLetter>,
1038}
1039
1040#[derive(Debug, Clone)]
1041pub struct InMemoryRecordSource {
1042 source_name: String,
1043 records: Vec<Row>,
1044 lineage: Option<String>,
1045 freshness_unix_ms: Option<u64>,
1046}
1047
1048impl InMemoryRecordSource {
1049 pub fn new(source_name: impl Into<String>, records: Vec<Row>) -> Self {
1050 Self {
1051 source_name: source_name.into(),
1052 records,
1053 lineage: None,
1054 freshness_unix_ms: Some(now_unix_ms()),
1055 }
1056 }
1057
1058 pub fn with_lineage(mut self, lineage: impl Into<String>) -> Self {
1059 self.lineage = Some(lineage.into());
1060 self
1061 }
1062
1063 pub fn with_freshness_unix_ms(mut self, freshness_unix_ms: u64) -> Self {
1064 self.freshness_unix_ms = Some(freshness_unix_ms);
1065 self
1066 }
1067}
1068
1069impl Source for InMemoryRecordSource {
1070 fn source_name(&self) -> &str {
1071 &self.source_name
1072 }
1073
1074 fn read_batch(
1075 &self,
1076 pipeline: &Pipeline,
1077 checkpoint: Option<&str>,
1078 _context: &QueryContext,
1079 ) -> DataResult<PipelineBatch> {
1080 let offset = parse_checkpoint_offset(checkpoint);
1081 let limit = pipeline.batch_size.max(1);
1082 let total_rows = self.records.len();
1083 let rows = self
1084 .records
1085 .iter()
1086 .skip(offset)
1087 .take(limit)
1088 .cloned()
1089 .collect::<Vec<_>>();
1090 let next_offset = offset.saturating_add(rows.len());
1091 Ok(PipelineBatch {
1092 records: rows,
1093 next_checkpoint: Some(next_offset.to_string()),
1094 has_more: next_offset < total_rows,
1095 lineage: self.lineage.clone(),
1096 source_freshness_unix_ms: self.freshness_unix_ms,
1097 })
1098 }
1099}
1100
1101#[derive(Debug, Clone)]
1102pub struct FileJsonLineSource {
1103 source_name: String,
1104 path: PathBuf,
1105}
1106
1107impl FileJsonLineSource {
1108 pub fn new(source_name: impl Into<String>, path: impl Into<PathBuf>) -> Self {
1109 Self {
1110 source_name: source_name.into(),
1111 path: path.into(),
1112 }
1113 }
1114}
1115
1116impl Source for FileJsonLineSource {
1117 fn source_name(&self) -> &str {
1118 &self.source_name
1119 }
1120
1121 fn read_batch(
1122 &self,
1123 pipeline: &Pipeline,
1124 checkpoint: Option<&str>,
1125 _context: &QueryContext,
1126 ) -> DataResult<PipelineBatch> {
1127 if !self.path.exists() {
1128 return Ok(PipelineBatch {
1129 records: Vec::new(),
1130 next_checkpoint: Some(parse_checkpoint_offset(checkpoint).to_string()),
1131 has_more: false,
1132 lineage: Some(format!("file://{}", self.path.display())),
1133 source_freshness_unix_ms: None,
1134 });
1135 }
1136
1137 let file = OpenOptions::new().read(true).open(&self.path)?;
1138 let reader = BufReader::new(file);
1139 let mut records = Vec::new();
1140 for line in reader.lines() {
1141 let line = line?;
1142 if line.trim().is_empty() {
1143 continue;
1144 }
1145 let value: Value = serde_json::from_str(&line)?;
1146 let Value::Object(map) = value else {
1147 return Err(DataError::Integration(format!(
1148 "file source expected JSON object row in {}",
1149 self.path.display()
1150 )));
1151 };
1152 records.push(BTreeMap::from_iter(map));
1153 }
1154
1155 let offset = parse_checkpoint_offset(checkpoint);
1156 let limit = pipeline.batch_size.max(1);
1157 let total_rows = records.len();
1158 let rows = records
1159 .into_iter()
1160 .skip(offset)
1161 .take(limit)
1162 .collect::<Vec<_>>();
1163 let next_offset = offset.saturating_add(rows.len());
1164 let freshness_unix_ms = fs::metadata(&self.path)
1165 .ok()
1166 .and_then(|metadata| metadata.modified().ok())
1167 .and_then(system_time_to_unix_ms);
1168 Ok(PipelineBatch {
1169 records: rows,
1170 next_checkpoint: Some(next_offset.to_string()),
1171 has_more: next_offset < total_rows,
1172 lineage: Some(format!("file://{}", self.path.display())),
1173 source_freshness_unix_ms: freshness_unix_ms,
1174 })
1175 }
1176}
1177
1178#[derive(Debug, Clone, Default)]
1179pub struct InMemoryObjectStore {
1180 inner: Arc<Mutex<HashMap<String, Vec<Value>>>>,
1181}
1182
1183impl InMemoryObjectStore {
1184 pub fn put(&self, key: &str, payload: Vec<Value>) -> DataResult<()> {
1185 let mut guard = self
1186 .inner
1187 .lock()
1188 .map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
1189 guard.insert(key.to_string(), payload);
1190 Ok(())
1191 }
1192
1193 pub fn get(&self, key: &str) -> DataResult<Vec<Value>> {
1194 let guard = self
1195 .inner
1196 .lock()
1197 .map_err(|_| DataError::Integration("object store lock poisoned".to_string()))?;
1198 Ok(guard.get(key).cloned().unwrap_or_default())
1199 }
1200}
1201
1202#[derive(Debug, Clone)]
1203pub struct ObjectStoreJsonSource {
1204 source_name: String,
1205 object_store: InMemoryObjectStore,
1206 object_key: String,
1207}
1208
1209impl ObjectStoreJsonSource {
1210 pub fn new(
1211 source_name: impl Into<String>,
1212 object_store: InMemoryObjectStore,
1213 object_key: impl Into<String>,
1214 ) -> Self {
1215 Self {
1216 source_name: source_name.into(),
1217 object_store,
1218 object_key: object_key.into(),
1219 }
1220 }
1221}
1222
1223impl Source for ObjectStoreJsonSource {
1224 fn source_name(&self) -> &str {
1225 &self.source_name
1226 }
1227
1228 fn read_batch(
1229 &self,
1230 pipeline: &Pipeline,
1231 checkpoint: Option<&str>,
1232 _context: &QueryContext,
1233 ) -> DataResult<PipelineBatch> {
1234 let offset = parse_checkpoint_offset(checkpoint);
1235 let limit = pipeline.batch_size.max(1);
1236 let objects = self.object_store.get(&self.object_key)?;
1237 let total_rows = objects.len();
1238 let mut rows = Vec::new();
1239 for value in objects.into_iter().skip(offset).take(limit) {
1240 let Value::Object(map) = value else {
1241 return Err(DataError::Integration(format!(
1242 "object store source expected JSON object rows for key `{}`",
1243 self.object_key
1244 )));
1245 };
1246 rows.push(BTreeMap::from_iter(map));
1247 }
1248 let next_offset = offset.saturating_add(rows.len());
1249 Ok(PipelineBatch {
1250 records: rows,
1251 next_checkpoint: Some(next_offset.to_string()),
1252 has_more: next_offset < total_rows,
1253 lineage: Some(format!("object://{}", self.object_key)),
1254 source_freshness_unix_ms: Some(now_unix_ms()),
1255 })
1256 }
1257}
1258
1259#[derive(Debug)]
1260struct IdempotentSinkState {
1261 key_field: String,
1262 rows: Vec<Row>,
1263 seen: HashSet<String>,
1264}
1265
1266#[derive(Clone)]
1267pub struct IdempotentInMemorySink {
1268 sink_name: String,
1269 state: Arc<Mutex<IdempotentSinkState>>,
1270 probe: Option<Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>>,
1271}
1272
1273impl std::fmt::Debug for IdempotentInMemorySink {
1274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1275 f.debug_struct("IdempotentInMemorySink")
1276 .field("sink_name", &self.sink_name)
1277 .finish_non_exhaustive()
1278 }
1279}
1280
1281impl IdempotentInMemorySink {
1282 pub fn new(sink_name: impl Into<String>, key_field: impl Into<String>) -> Self {
1283 Self {
1284 sink_name: sink_name.into(),
1285 state: Arc::new(Mutex::new(IdempotentSinkState {
1286 key_field: key_field.into(),
1287 rows: Vec::new(),
1288 seen: HashSet::new(),
1289 })),
1290 probe: None,
1291 }
1292 }
1293
1294 pub fn with_probe(
1295 mut self,
1296 probe: Arc<dyn Fn(&QueryContext) -> DataResult<()> + Send + Sync>,
1297 ) -> Self {
1298 self.probe = Some(probe);
1299 self
1300 }
1301
1302 pub fn rows(&self) -> DataResult<Vec<Row>> {
1303 let guard = self
1304 .state
1305 .lock()
1306 .map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
1307 Ok(guard.rows.clone())
1308 }
1309
1310 fn accept_records(&self, records: &[Row]) -> DataResult<(Vec<Row>, usize)> {
1311 let mut guard = self
1312 .state
1313 .lock()
1314 .map_err(|_| DataError::Integration("sink state lock poisoned".to_string()))?;
1315 let key_field = guard.key_field.clone();
1316 let mut accepted = Vec::new();
1317 let mut duplicate_writes = 0usize;
1318 for record in records {
1319 let key = record
1320 .get(&key_field)
1321 .map(Value::to_string)
1322 .unwrap_or_else(|| serde_json::to_string(record).unwrap_or_default());
1323 if !guard.seen.insert(key) {
1324 duplicate_writes = duplicate_writes.saturating_add(1);
1325 continue;
1326 }
1327 guard.rows.push(record.clone());
1328 accepted.push(record.clone());
1329 }
1330 Ok((accepted, duplicate_writes))
1331 }
1332}
1333
1334impl Sink for IdempotentInMemorySink {
1335 fn sink_name(&self) -> &str {
1336 &self.sink_name
1337 }
1338
1339 fn write_batch(
1340 &self,
1341 _pipeline: &Pipeline,
1342 _run: &PipelineRun,
1343 records: &[Row],
1344 context: &QueryContext,
1345 ) -> DataResult<SinkWriteResult> {
1346 if let Some(probe) = &self.probe {
1347 probe(context)?;
1348 }
1349 let (accepted, duplicate_writes) = self.accept_records(records)?;
1350 Ok(SinkWriteResult {
1351 written: accepted.len(),
1352 duplicate_writes,
1353 dead_letters: Vec::new(),
1354 })
1355 }
1356}
1357
1358#[derive(Debug, Clone)]
1359pub struct FileJsonLineSink {
1360 sink_name: String,
1361 path: PathBuf,
1362 dedupe: IdempotentInMemorySink,
1363}
1364
1365impl FileJsonLineSink {
1366 pub fn new(
1367 sink_name: impl Into<String>,
1368 path: impl Into<PathBuf>,
1369 key_field: impl Into<String>,
1370 ) -> Self {
1371 Self {
1372 sink_name: sink_name.into(),
1373 path: path.into(),
1374 dedupe: IdempotentInMemorySink::new("file-jsonline-dedupe", key_field),
1375 }
1376 }
1377}
1378
1379impl Sink for FileJsonLineSink {
1380 fn sink_name(&self) -> &str {
1381 &self.sink_name
1382 }
1383
1384 fn write_batch(
1385 &self,
1386 _pipeline: &Pipeline,
1387 _run: &PipelineRun,
1388 records: &[Row],
1389 context: &QueryContext,
1390 ) -> DataResult<SinkWriteResult> {
1391 if let Some(probe) = &self.dedupe.probe {
1392 probe(context)?;
1393 }
1394 let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
1395 if accepted.is_empty() {
1396 return Ok(SinkWriteResult {
1397 written: 0,
1398 duplicate_writes,
1399 dead_letters: Vec::new(),
1400 });
1401 }
1402
1403 let mut file = OpenOptions::new()
1404 .create(true)
1405 .append(true)
1406 .open(&self.path)?;
1407 for row in &accepted {
1408 let json = serde_json::to_string(row)?;
1409 file.write_all(json.as_bytes())?;
1410 file.write_all(b"\n")?;
1411 }
1412 Ok(SinkWriteResult {
1413 written: accepted.len(),
1414 duplicate_writes,
1415 dead_letters: Vec::new(),
1416 })
1417 }
1418}
1419
1420#[derive(Debug, Clone)]
1421pub struct ObjectStoreJsonSink {
1422 sink_name: String,
1423 object_store: InMemoryObjectStore,
1424 object_key: String,
1425 dedupe: IdempotentInMemorySink,
1426}
1427
1428impl ObjectStoreJsonSink {
1429 pub fn new(
1430 sink_name: impl Into<String>,
1431 object_store: InMemoryObjectStore,
1432 object_key: impl Into<String>,
1433 key_field: impl Into<String>,
1434 ) -> Self {
1435 Self {
1436 sink_name: sink_name.into(),
1437 object_store,
1438 object_key: object_key.into(),
1439 dedupe: IdempotentInMemorySink::new("object-json-dedupe", key_field),
1440 }
1441 }
1442}
1443
1444impl Sink for ObjectStoreJsonSink {
1445 fn sink_name(&self) -> &str {
1446 &self.sink_name
1447 }
1448
1449 fn write_batch(
1450 &self,
1451 _pipeline: &Pipeline,
1452 _run: &PipelineRun,
1453 records: &[Row],
1454 context: &QueryContext,
1455 ) -> DataResult<SinkWriteResult> {
1456 if let Some(probe) = &self.dedupe.probe {
1457 probe(context)?;
1458 }
1459 let (accepted, duplicate_writes) = self.dedupe.accept_records(records)?;
1460 if accepted.is_empty() {
1461 return Ok(SinkWriteResult {
1462 written: 0,
1463 duplicate_writes,
1464 dead_letters: Vec::new(),
1465 });
1466 }
1467
1468 let mut payload = self.object_store.get(&self.object_key)?;
1469 for row in &accepted {
1470 payload.push(serde_json::to_value(row)?);
1471 }
1472 self.object_store.put(&self.object_key, payload)?;
1473 Ok(SinkWriteResult {
1474 written: accepted.len(),
1475 duplicate_writes,
1476 dead_letters: Vec::new(),
1477 })
1478 }
1479}
1480
1481#[derive(Debug, Clone)]
1482pub struct SingleStorePipelineSource<A: SingleStoreAdapter + ?Sized> {
1483 source_name: String,
1484 adapter: Arc<A>,
1485 query: SqlCommand,
1486 dataset: String,
1487 contract: AdapterCallContract,
1488 artificial_delay_ms: u64,
1489}
1490
1491impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSource<A> {
1492 pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1493 Self {
1494 source_name: "singlestore".to_string(),
1495 adapter,
1496 query,
1497 dataset: dataset.into(),
1498 contract: AdapterCallContract::default_query(),
1499 artificial_delay_ms: 0,
1500 }
1501 }
1502
1503 pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1504 self.contract = contract;
1505 self
1506 }
1507
1508 pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1509 self.artificial_delay_ms = artificial_delay_ms;
1510 self
1511 }
1512}
1513
1514impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Source for SingleStorePipelineSource<A> {
1515 fn source_name(&self) -> &str {
1516 &self.source_name
1517 }
1518
1519 fn read_batch(
1520 &self,
1521 pipeline: &Pipeline,
1522 checkpoint: Option<&str>,
1523 context: &QueryContext,
1524 ) -> DataResult<PipelineBatch> {
1525 let offset = parse_checkpoint_offset(checkpoint);
1526 let request = DataWindowRequest {
1527 dataset: self.dataset.clone(),
1528 offset,
1529 limit: pipeline.batch_size.max(1),
1530 query_token: None,
1531 window_token: checkpoint.map(ToString::to_string),
1532 wire_format: WireFormatProfile::Json,
1533 };
1534 let response = map_integration_result(
1535 "singlestore.pipeline_source",
1536 run_with_contract(
1537 "singlestore",
1538 "pipeline_source.read_batch",
1539 self.contract,
1540 context,
1541 |_| {
1542 if self.artificial_delay_ms > 0 {
1543 thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1544 }
1545 self.adapter.run_high_volume_window_query(
1546 self.query.clone(),
1547 request.clone(),
1548 context,
1549 )
1550 },
1551 ),
1552 )?;
1553 let next_offset = response.offset.saturating_add(response.rows.len());
1554 Ok(PipelineBatch {
1555 records: response.rows,
1556 next_checkpoint: Some(next_offset.to_string()),
1557 has_more: next_offset < response.total_rows,
1558 lineage: Some(response.query_token),
1559 source_freshness_unix_ms: Some(now_unix_ms()),
1560 })
1561 }
1562}
1563
1564#[derive(Debug, Clone)]
1565pub struct ClickHousePipelineSource<A: ClickHouseAdapter + ?Sized> {
1566 source_name: String,
1567 adapter: Arc<A>,
1568 query: SqlCommand,
1569 dataset: String,
1570 contract: AdapterCallContract,
1571 artificial_delay_ms: u64,
1572}
1573
1574impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSource<A> {
1575 pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1576 Self {
1577 source_name: "clickhouse".to_string(),
1578 adapter,
1579 query,
1580 dataset: dataset.into(),
1581 contract: AdapterCallContract::default_query(),
1582 artificial_delay_ms: 0,
1583 }
1584 }
1585
1586 pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1587 self.contract = contract;
1588 self
1589 }
1590
1591 pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1592 self.artificial_delay_ms = artificial_delay_ms;
1593 self
1594 }
1595}
1596
1597impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Source for ClickHousePipelineSource<A> {
1598 fn source_name(&self) -> &str {
1599 &self.source_name
1600 }
1601
1602 fn read_batch(
1603 &self,
1604 pipeline: &Pipeline,
1605 checkpoint: Option<&str>,
1606 context: &QueryContext,
1607 ) -> DataResult<PipelineBatch> {
1608 let offset = parse_checkpoint_offset(checkpoint);
1609 let request = DataWindowRequest {
1610 dataset: self.dataset.clone(),
1611 offset,
1612 limit: pipeline.batch_size.max(1),
1613 query_token: None,
1614 window_token: checkpoint.map(ToString::to_string),
1615 wire_format: WireFormatProfile::Json,
1616 };
1617 let response = map_integration_result(
1618 "clickhouse.pipeline_source",
1619 run_with_contract(
1620 "clickhouse",
1621 "pipeline_source.read_batch",
1622 self.contract,
1623 context,
1624 |_| {
1625 if self.artificial_delay_ms > 0 {
1626 thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1627 }
1628 self.adapter.run_high_volume_window_query(
1629 self.query.clone(),
1630 request.clone(),
1631 context,
1632 )
1633 },
1634 ),
1635 )?;
1636 let next_offset = response.offset.saturating_add(response.rows.len());
1637 Ok(PipelineBatch {
1638 records: response.rows,
1639 next_checkpoint: Some(next_offset.to_string()),
1640 has_more: next_offset < response.total_rows,
1641 lineage: Some(response.query_token),
1642 source_freshness_unix_ms: Some(now_unix_ms()),
1643 })
1644 }
1645}
1646
1647#[derive(Debug, Clone)]
1648pub struct BigQueryPipelineSource<A: BigQueryAdapter + ?Sized> {
1649 source_name: String,
1650 adapter: Arc<A>,
1651 query: SqlCommand,
1652 dataset: String,
1653 contract: AdapterCallContract,
1654 artificial_delay_ms: u64,
1655}
1656
1657impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSource<A> {
1658 pub fn new(adapter: Arc<A>, query: SqlCommand, dataset: impl Into<String>) -> Self {
1659 Self {
1660 source_name: "bigquery".to_string(),
1661 adapter,
1662 query,
1663 dataset: dataset.into(),
1664 contract: AdapterCallContract::default_query(),
1665 artificial_delay_ms: 0,
1666 }
1667 }
1668
1669 pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1670 self.contract = contract;
1671 self
1672 }
1673
1674 pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1675 self.artificial_delay_ms = artificial_delay_ms;
1676 self
1677 }
1678}
1679
1680impl<A: BigQueryAdapter + ?Sized + Send + Sync> Source for BigQueryPipelineSource<A> {
1681 fn source_name(&self) -> &str {
1682 &self.source_name
1683 }
1684
1685 fn read_batch(
1686 &self,
1687 pipeline: &Pipeline,
1688 checkpoint: Option<&str>,
1689 context: &QueryContext,
1690 ) -> DataResult<PipelineBatch> {
1691 let offset = parse_checkpoint_offset(checkpoint);
1692 let request = DataWindowRequest {
1693 dataset: self.dataset.clone(),
1694 offset,
1695 limit: pipeline.batch_size.max(1),
1696 query_token: None,
1697 window_token: checkpoint.map(ToString::to_string),
1698 wire_format: WireFormatProfile::Json,
1699 };
1700 let response = map_integration_result(
1701 "bigquery.pipeline_source",
1702 run_with_contract(
1703 "bigquery",
1704 "pipeline_source.read_batch",
1705 self.contract,
1706 context,
1707 |_| {
1708 if self.artificial_delay_ms > 0 {
1709 thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1710 }
1711 self.adapter.run_high_volume_window_query(
1712 self.query.clone(),
1713 request.clone(),
1714 context,
1715 )
1716 },
1717 ),
1718 )?;
1719 let next_offset = response.offset.saturating_add(response.rows.len());
1720 Ok(PipelineBatch {
1721 records: response.rows,
1722 next_checkpoint: Some(next_offset.to_string()),
1723 has_more: next_offset < response.total_rows,
1724 lineage: Some(response.query_token),
1725 source_freshness_unix_ms: Some(now_unix_ms()),
1726 })
1727 }
1728}
1729
1730#[derive(Debug, Clone)]
1731pub struct OpenSearchPipelineSource<A: OpenSearchAdapter + ?Sized> {
1732 source_name: String,
1733 adapter: Arc<A>,
1734 request_template: SearchRequest,
1735 contract: AdapterCallContract,
1736 artificial_delay_ms: u64,
1737}
1738
1739impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSource<A> {
1740 pub fn new(adapter: Arc<A>, request_template: SearchRequest) -> Self {
1741 Self {
1742 source_name: "opensearch".to_string(),
1743 adapter,
1744 request_template,
1745 contract: AdapterCallContract::default_query(),
1746 artificial_delay_ms: 0,
1747 }
1748 }
1749
1750 pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1751 self.contract = contract;
1752 self
1753 }
1754
1755 pub fn with_artificial_delay_ms(mut self, artificial_delay_ms: u64) -> Self {
1756 self.artificial_delay_ms = artificial_delay_ms;
1757 self
1758 }
1759}
1760
1761impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Source for OpenSearchPipelineSource<A> {
1762 fn source_name(&self) -> &str {
1763 &self.source_name
1764 }
1765
1766 fn read_batch(
1767 &self,
1768 pipeline: &Pipeline,
1769 checkpoint: Option<&str>,
1770 context: &QueryContext,
1771 ) -> DataResult<PipelineBatch> {
1772 let offset = parse_checkpoint_offset(checkpoint);
1773 let window = DataWindowRequest {
1774 dataset: self.request_template.index.clone(),
1775 offset,
1776 limit: pipeline.batch_size.max(1),
1777 query_token: None,
1778 window_token: checkpoint.map(ToString::to_string),
1779 wire_format: WireFormatProfile::Json,
1780 };
1781 let response = map_integration_result(
1782 "opensearch.pipeline_source",
1783 run_with_contract(
1784 "opensearch",
1785 "pipeline_source.read_batch",
1786 self.contract,
1787 context,
1788 |_| {
1789 if self.artificial_delay_ms > 0 {
1790 thread::sleep(Duration::from_millis(self.artificial_delay_ms));
1791 }
1792 self.adapter.search_window(
1793 self.request_template.clone(),
1794 window.clone(),
1795 context,
1796 )
1797 },
1798 ),
1799 )?;
1800 let next_offset = response.offset.saturating_add(response.rows.len());
1801 Ok(PipelineBatch {
1802 records: response.rows,
1803 next_checkpoint: Some(next_offset.to_string()),
1804 has_more: next_offset < response.total_rows,
1805 lineage: Some(response.query_token),
1806 source_freshness_unix_ms: Some(now_unix_ms()),
1807 })
1808 }
1809}
1810
1811#[derive(Clone)]
1812pub struct SingleStorePipelineSink<A: SingleStoreAdapter + ?Sized> {
1813 adapter: Arc<A>,
1814 probe_statement: String,
1815 contract: AdapterCallContract,
1816 inner: IdempotentInMemorySink,
1817}
1818
1819impl<A: SingleStoreAdapter + ?Sized> std::fmt::Debug for SingleStorePipelineSink<A> {
1820 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1821 f.debug_struct("SingleStorePipelineSink")
1822 .finish_non_exhaustive()
1823 }
1824}
1825
1826impl<A: SingleStoreAdapter + ?Sized> SingleStorePipelineSink<A> {
1827 pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1828 Self {
1829 adapter,
1830 probe_statement: "SELECT 1".to_string(),
1831 contract: AdapterCallContract::default_query(),
1832 inner: IdempotentInMemorySink::new("singlestore-pipeline-sink", key_field),
1833 }
1834 }
1835
1836 pub fn with_probe_statement(mut self, statement: impl Into<String>) -> Self {
1837 self.probe_statement = statement.into();
1838 self
1839 }
1840
1841 pub fn with_contract(mut self, contract: AdapterCallContract) -> Self {
1842 self.contract = contract;
1843 self
1844 }
1845}
1846
1847impl<A: SingleStoreAdapter + ?Sized + Send + Sync> Sink for SingleStorePipelineSink<A> {
1848 fn sink_name(&self) -> &str {
1849 "singlestore"
1850 }
1851
1852 fn write_batch(
1853 &self,
1854 pipeline: &Pipeline,
1855 run: &PipelineRun,
1856 records: &[Row],
1857 context: &QueryContext,
1858 ) -> DataResult<SinkWriteResult> {
1859 map_integration_result(
1860 "singlestore.pipeline_sink",
1861 run_with_contract(
1862 "singlestore",
1863 "pipeline_sink.write_batch",
1864 self.contract,
1865 context,
1866 |_| {
1867 self.adapter
1868 .run_query(
1869 SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1870 context,
1871 )
1872 .map(|_| ())
1873 },
1874 ),
1875 )?;
1876 self.inner.write_batch(pipeline, run, records, context)
1877 }
1878}
1879
1880#[derive(Clone)]
1881pub struct ClickHousePipelineSink<A: ClickHouseAdapter + ?Sized> {
1882 adapter: Arc<A>,
1883 probe_statement: String,
1884 contract: AdapterCallContract,
1885 inner: IdempotentInMemorySink,
1886}
1887
1888impl<A: ClickHouseAdapter + ?Sized> std::fmt::Debug for ClickHousePipelineSink<A> {
1889 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1890 f.debug_struct("ClickHousePipelineSink")
1891 .finish_non_exhaustive()
1892 }
1893}
1894
1895impl<A: ClickHouseAdapter + ?Sized> ClickHousePipelineSink<A> {
1896 pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1897 Self {
1898 adapter,
1899 probe_statement: "SELECT 1".to_string(),
1900 contract: AdapterCallContract::default_query(),
1901 inner: IdempotentInMemorySink::new("clickhouse-pipeline-sink", key_field),
1902 }
1903 }
1904}
1905
1906impl<A: ClickHouseAdapter + ?Sized + Send + Sync> Sink for ClickHousePipelineSink<A> {
1907 fn sink_name(&self) -> &str {
1908 "clickhouse"
1909 }
1910
1911 fn write_batch(
1912 &self,
1913 pipeline: &Pipeline,
1914 run: &PipelineRun,
1915 records: &[Row],
1916 context: &QueryContext,
1917 ) -> DataResult<SinkWriteResult> {
1918 map_integration_result(
1919 "clickhouse.pipeline_sink",
1920 run_with_contract(
1921 "clickhouse",
1922 "pipeline_sink.write_batch",
1923 self.contract,
1924 context,
1925 |_| {
1926 self.adapter
1927 .run_query(
1928 SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1929 context,
1930 )
1931 .map(|_| ())
1932 },
1933 ),
1934 )?;
1935 self.inner.write_batch(pipeline, run, records, context)
1936 }
1937}
1938
1939#[derive(Clone)]
1940pub struct BigQueryPipelineSink<A: BigQueryAdapter + ?Sized> {
1941 adapter: Arc<A>,
1942 probe_statement: String,
1943 contract: AdapterCallContract,
1944 inner: IdempotentInMemorySink,
1945}
1946
1947impl<A: BigQueryAdapter + ?Sized> std::fmt::Debug for BigQueryPipelineSink<A> {
1948 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1949 f.debug_struct("BigQueryPipelineSink")
1950 .finish_non_exhaustive()
1951 }
1952}
1953
1954impl<A: BigQueryAdapter + ?Sized> BigQueryPipelineSink<A> {
1955 pub fn new(adapter: Arc<A>, key_field: impl Into<String>) -> Self {
1956 Self {
1957 adapter,
1958 probe_statement: "SELECT 1".to_string(),
1959 contract: AdapterCallContract::default_query(),
1960 inner: IdempotentInMemorySink::new("bigquery-pipeline-sink", key_field),
1961 }
1962 }
1963}
1964
1965impl<A: BigQueryAdapter + ?Sized + Send + Sync> Sink for BigQueryPipelineSink<A> {
1966 fn sink_name(&self) -> &str {
1967 "bigquery"
1968 }
1969
1970 fn write_batch(
1971 &self,
1972 pipeline: &Pipeline,
1973 run: &PipelineRun,
1974 records: &[Row],
1975 context: &QueryContext,
1976 ) -> DataResult<SinkWriteResult> {
1977 map_integration_result(
1978 "bigquery.pipeline_sink",
1979 run_with_contract(
1980 "bigquery",
1981 "pipeline_sink.write_batch",
1982 self.contract,
1983 context,
1984 |_| {
1985 self.adapter
1986 .run_query(
1987 SqlCommand::new(self.probe_statement.clone(), Vec::new()),
1988 context,
1989 )
1990 .map(|_| ())
1991 },
1992 ),
1993 )?;
1994 self.inner.write_batch(pipeline, run, records, context)
1995 }
1996}
1997
1998#[derive(Clone)]
1999pub struct OpenSearchPipelineSink<A: OpenSearchAdapter + ?Sized> {
2000 adapter: Arc<A>,
2001 probe_request: SearchRequest,
2002 contract: AdapterCallContract,
2003 inner: IdempotentInMemorySink,
2004}
2005
2006impl<A: OpenSearchAdapter + ?Sized> std::fmt::Debug for OpenSearchPipelineSink<A> {
2007 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2008 f.debug_struct("OpenSearchPipelineSink")
2009 .finish_non_exhaustive()
2010 }
2011}
2012
2013impl<A: OpenSearchAdapter + ?Sized> OpenSearchPipelineSink<A> {
2014 pub fn new(
2015 adapter: Arc<A>,
2016 key_field: impl Into<String>,
2017 probe_index: impl Into<String>,
2018 ) -> Self {
2019 Self {
2020 adapter,
2021 probe_request: SearchRequest::new(probe_index, ""),
2022 contract: AdapterCallContract::default_query(),
2023 inner: IdempotentInMemorySink::new("opensearch-pipeline-sink", key_field),
2024 }
2025 }
2026}
2027
2028impl<A: OpenSearchAdapter + ?Sized + Send + Sync> Sink for OpenSearchPipelineSink<A> {
2029 fn sink_name(&self) -> &str {
2030 "opensearch"
2031 }
2032
2033 fn write_batch(
2034 &self,
2035 pipeline: &Pipeline,
2036 run: &PipelineRun,
2037 records: &[Row],
2038 context: &QueryContext,
2039 ) -> DataResult<SinkWriteResult> {
2040 map_integration_result(
2041 "opensearch.pipeline_sink",
2042 run_with_contract(
2043 "opensearch",
2044 "pipeline_sink.write_batch",
2045 self.contract,
2046 context,
2047 |_| {
2048 self.adapter
2049 .search(self.probe_request.clone(), context)
2050 .map(|_| ())
2051 },
2052 ),
2053 )?;
2054 self.inner.write_batch(pipeline, run, records, context)
2055 }
2056}
2057
2058#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
2059pub struct PipelineConformanceCheck {
2060 pub name: String,
2061 pub passed: bool,
2062 pub detail: String,
2063}
2064
2065#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
2066pub struct PipelineConformanceReport {
2067 pub checks: Vec<PipelineConformanceCheck>,
2068}
2069
2070impl PipelineConformanceReport {
2071 pub fn passed(&self) -> bool {
2072 self.checks.iter().all(|check| check.passed)
2073 }
2074
2075 fn add_pass(&mut self, name: impl Into<String>, detail: impl Into<String>) {
2076 self.checks.push(PipelineConformanceCheck {
2077 name: name.into(),
2078 passed: true,
2079 detail: detail.into(),
2080 });
2081 }
2082
2083 fn add_fail(&mut self, name: impl Into<String>, detail: impl Into<String>) {
2084 self.checks.push(PipelineConformanceCheck {
2085 name: name.into(),
2086 passed: false,
2087 detail: detail.into(),
2088 });
2089 }
2090}
2091
2092pub fn run_pipeline_adapter_conformance_suite(
2093 singlestore: Arc<dyn SingleStoreAdapter>,
2094 clickhouse: Arc<dyn ClickHouseAdapter>,
2095 bigquery: Arc<dyn BigQueryAdapter>,
2096 opensearch: Arc<dyn OpenSearchAdapter>,
2097 context: &QueryContext,
2098) -> PipelineConformanceReport {
2099 let mut report = PipelineConformanceReport::default();
2100
2101 let sql_query = SqlCommand::new(
2102 "SELECT tenant, active_accounts FROM tenant_rollup",
2103 Vec::new(),
2104 );
2105 let search_request = SearchRequest::new("accounts", "");
2106
2107 let mut checkpoint_resume_checks = Vec::new();
2108 checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2109 "singlestore",
2110 Pipeline::new(
2111 "m60-singlestore-checkpoint",
2112 "M60 SingleStore Checkpoint",
2113 "singlestore",
2114 "passthrough",
2115 "idempotent_sink",
2116 ),
2117 Arc::new(SingleStorePipelineSource::new(
2118 singlestore.clone(),
2119 sql_query.clone(),
2120 "tenant_rollup",
2121 )),
2122 Arc::new(IdempotentInMemorySink::new(
2123 "m60-sink-singlestore",
2124 "tenant",
2125 )),
2126 context.clone(),
2127 ));
2128 checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2129 "clickhouse",
2130 Pipeline::new(
2131 "m60-clickhouse-checkpoint",
2132 "M60 ClickHouse Checkpoint",
2133 "clickhouse",
2134 "passthrough",
2135 "idempotent_sink",
2136 ),
2137 Arc::new(ClickHousePipelineSource::new(
2138 clickhouse.clone(),
2139 sql_query.clone(),
2140 "tenant_rollup",
2141 )),
2142 Arc::new(IdempotentInMemorySink::new("m60-sink-clickhouse", "tenant")),
2143 context.clone(),
2144 ));
2145 checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2146 "bigquery",
2147 Pipeline::new(
2148 "m60-bigquery-checkpoint",
2149 "M60 BigQuery Checkpoint",
2150 "bigquery",
2151 "passthrough",
2152 "idempotent_sink",
2153 ),
2154 Arc::new(BigQueryPipelineSource::new(
2155 bigquery.clone(),
2156 sql_query.clone(),
2157 "tenant_rollup",
2158 )),
2159 Arc::new(IdempotentInMemorySink::new("m60-sink-bigquery", "tenant")),
2160 context.clone(),
2161 ));
2162 checkpoint_resume_checks.push(run_checkpoint_resume_probe(
2163 "opensearch",
2164 Pipeline::new(
2165 "m60-opensearch-checkpoint",
2166 "M60 OpenSearch Checkpoint",
2167 "opensearch",
2168 "passthrough",
2169 "idempotent_sink",
2170 ),
2171 Arc::new(OpenSearchPipelineSource::new(
2172 opensearch.clone(),
2173 search_request.clone(),
2174 )),
2175 Arc::new(IdempotentInMemorySink::new(
2176 "m60-sink-opensearch",
2177 "account",
2178 )),
2179 context.clone(),
2180 ));
2181
2182 for (driver, passed, detail) in checkpoint_resume_checks {
2183 if passed {
2184 report.add_pass(format!("{driver}.checkpoint_resume"), detail);
2185 } else {
2186 report.add_fail(format!("{driver}.checkpoint_resume"), detail);
2187 }
2188 }
2189
2190 let mut retry_checks = Vec::new();
2191 retry_checks.push(run_retry_probe(
2192 "singlestore",
2193 Arc::new(SingleStorePipelineSource::new(
2194 singlestore.clone(),
2195 sql_query.clone(),
2196 "tenant_rollup",
2197 )),
2198 context.clone(),
2199 ));
2200 retry_checks.push(run_retry_probe(
2201 "clickhouse",
2202 Arc::new(ClickHousePipelineSource::new(
2203 clickhouse.clone(),
2204 sql_query.clone(),
2205 "tenant_rollup",
2206 )),
2207 context.clone(),
2208 ));
2209 retry_checks.push(run_retry_probe(
2210 "bigquery",
2211 Arc::new(BigQueryPipelineSource::new(
2212 bigquery.clone(),
2213 sql_query.clone(),
2214 "tenant_rollup",
2215 )),
2216 context.clone(),
2217 ));
2218 retry_checks.push(run_retry_probe(
2219 "opensearch",
2220 Arc::new(OpenSearchPipelineSource::new(
2221 opensearch.clone(),
2222 search_request.clone(),
2223 )),
2224 context.clone(),
2225 ));
2226
2227 for (driver, passed, detail) in retry_checks {
2228 if passed {
2229 report.add_pass(format!("{driver}.retry"), detail);
2230 } else {
2231 report.add_fail(format!("{driver}.retry"), detail);
2232 }
2233 }
2234
2235 let mut timeout_checks = Vec::new();
2236 timeout_checks.push(run_timeout_probe(
2237 "singlestore",
2238 Arc::new(
2239 SingleStorePipelineSource::new(singlestore, sql_query.clone(), "tenant_rollup")
2240 .with_artificial_delay_ms(5),
2241 ),
2242 context.clone().with_timeout_ms(1),
2243 ));
2244 timeout_checks.push(run_timeout_probe(
2245 "clickhouse",
2246 Arc::new(
2247 ClickHousePipelineSource::new(clickhouse, sql_query.clone(), "tenant_rollup")
2248 .with_artificial_delay_ms(5),
2249 ),
2250 context.clone().with_timeout_ms(1),
2251 ));
2252 timeout_checks.push(run_timeout_probe(
2253 "bigquery",
2254 Arc::new(
2255 BigQueryPipelineSource::new(bigquery, sql_query, "tenant_rollup")
2256 .with_artificial_delay_ms(5),
2257 ),
2258 context.clone().with_timeout_ms(1),
2259 ));
2260 timeout_checks.push(run_timeout_probe(
2261 "opensearch",
2262 Arc::new(
2263 OpenSearchPipelineSource::new(opensearch, search_request).with_artificial_delay_ms(5),
2264 ),
2265 context.clone().with_timeout_ms(1),
2266 ));
2267
2268 for (driver, passed, detail) in timeout_checks {
2269 if passed {
2270 report.add_pass(format!("{driver}.timeout"), detail);
2271 } else {
2272 report.add_fail(format!("{driver}.timeout"), detail);
2273 }
2274 }
2275
2276 report
2277}
2278
2279fn run_checkpoint_resume_probe(
2280 driver: &str,
2281 base_pipeline: Pipeline,
2282 source: Arc<dyn Source>,
2283 sink: Arc<dyn Sink>,
2284 context: QueryContext,
2285) -> (String, bool, String) {
2286 let checkpoint_store = Arc::new(InMemoryCheckpointStore::default());
2287 let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
2288 let events = Arc::new(InMemoryPipelineEventStore::default());
2289 let first_runtime = PipelineRuntime::new(
2290 base_pipeline
2291 .clone()
2292 .with_batch_size(1)
2293 .with_max_batches_per_run(Some(1)),
2294 source.clone(),
2295 Arc::new(PassthroughTransform),
2296 sink.clone(),
2297 checkpoint_store.clone(),
2298 dead_letters.clone(),
2299 events.clone(),
2300 );
2301 let first = first_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
2302 let Ok(first_run) = first else {
2303 return (
2304 driver.to_string(),
2305 false,
2306 format!("initial run failed: {}", first.unwrap_err()),
2307 );
2308 };
2309 if first_run.status != PipelineRunStatus::Paused {
2310 return (
2311 driver.to_string(),
2312 false,
2313 format!("expected first status paused, got {:?}", first_run.status),
2314 );
2315 }
2316 if first_run.last_checkpoint.is_none() {
2317 return (
2318 driver.to_string(),
2319 false,
2320 "first run did not save checkpoint".to_string(),
2321 );
2322 }
2323
2324 let second_runtime = PipelineRuntime::new(
2325 base_pipeline.with_batch_size(1),
2326 source,
2327 Arc::new(PassthroughTransform),
2328 sink.clone(),
2329 checkpoint_store,
2330 dead_letters,
2331 events,
2332 );
2333 let second = second_runtime.execute_manual(context.clone(), PipelineControl::default(), None);
2334 let Ok(second_run) = second else {
2335 return (
2336 driver.to_string(),
2337 false,
2338 format!("resume run failed: {}", second.unwrap_err()),
2339 );
2340 };
2341 if second_run.status != PipelineRunStatus::Succeeded {
2342 return (
2343 driver.to_string(),
2344 false,
2345 format!(
2346 "expected resume status succeeded, got {:?}",
2347 second_run.status
2348 ),
2349 );
2350 }
2351
2352 let third = second_runtime.execute_manual(context, PipelineControl::default(), None);
2353 let Ok(third_run) = third else {
2354 return (
2355 driver.to_string(),
2356 false,
2357 format!("repeat run failed: {}", third.unwrap_err()),
2358 );
2359 };
2360 if third_run.rows_written != 0 {
2361 return (
2362 driver.to_string(),
2363 false,
2364 format!(
2365 "expected idempotent repeat run writes=0 got {}",
2366 third_run.rows_written
2367 ),
2368 );
2369 }
2370
2371 (
2372 driver.to_string(),
2373 true,
2374 format!(
2375 "resume status={:?}, repeat writes={}",
2376 second_run.status, third_run.rows_written
2377 ),
2378 )
2379}
2380
2381fn run_retry_probe(
2382 driver: &str,
2383 source: Arc<dyn Source>,
2384 context: QueryContext,
2385) -> (String, bool, String) {
2386 let attempts = Arc::new(AtomicUsize::new(0));
2387 let attempts_clone = attempts.clone();
2388 let sink = Arc::new(
2389 IdempotentInMemorySink::new(format!("m60-flaky-{driver}"), "tenant").with_probe(Arc::new(
2390 move |_ctx| {
2391 let attempt = attempts_clone.fetch_add(1, Ordering::SeqCst);
2392 if attempt == 0 {
2393 return Err(DataError::Integration(
2394 "transient sink probe failure".to_string(),
2395 ));
2396 }
2397 Ok(())
2398 },
2399 )),
2400 );
2401 let runtime = PipelineRuntime::new(
2402 Pipeline::new(
2403 format!("m60-{driver}-retry"),
2404 format!("M60 {driver} Retry Probe"),
2405 driver,
2406 "passthrough",
2407 "flaky_sink",
2408 )
2409 .with_batch_size(1)
2410 .with_retry_policy(RetryPolicy {
2411 max_attempts: 2,
2412 initial_backoff_ms: 0,
2413 max_backoff_ms: 0,
2414 }),
2415 source,
2416 Arc::new(PassthroughTransform),
2417 sink,
2418 Arc::new(InMemoryCheckpointStore::default()),
2419 Arc::new(InMemoryDeadLetterStore::default()),
2420 Arc::new(InMemoryPipelineEventStore::default()),
2421 );
2422 match runtime.execute_manual(context, PipelineControl::default(), None) {
2423 Ok(run) if run.status == PipelineRunStatus::Succeeded && run.retries >= 1 => {
2424 (driver.to_string(), true, format!("retries={}", run.retries))
2425 }
2426 Ok(run) => (
2427 driver.to_string(),
2428 false,
2429 format!(
2430 "unexpected run status={:?} retries={}",
2431 run.status, run.retries
2432 ),
2433 ),
2434 Err(err) => (
2435 driver.to_string(),
2436 false,
2437 format!("retry probe failed: {err}"),
2438 ),
2439 }
2440}
2441
2442fn run_timeout_probe(
2443 driver: &str,
2444 source: Arc<dyn Source>,
2445 context: QueryContext,
2446) -> (String, bool, String) {
2447 let runtime = PipelineRuntime::new(
2448 Pipeline::new(
2449 format!("m60-{driver}-timeout"),
2450 format!("M60 {driver} Timeout Probe"),
2451 driver,
2452 "passthrough",
2453 "noop_sink",
2454 )
2455 .with_batch_size(1)
2456 .with_deadline_ms(1_000)
2457 .with_retry_policy(RetryPolicy::never()),
2458 source,
2459 Arc::new(PassthroughTransform),
2460 Arc::new(IdempotentInMemorySink::new("m60-timeout-sink", "tenant")),
2461 Arc::new(InMemoryCheckpointStore::default()),
2462 Arc::new(InMemoryDeadLetterStore::default()),
2463 Arc::new(InMemoryPipelineEventStore::default()),
2464 );
2465 match runtime.execute_manual(context, PipelineControl::default(), None) {
2466 Ok(run) => {
2467 let detail = run.errors.join(" | ");
2468 let passed = run.status == PipelineRunStatus::Failed
2469 && detail.to_ascii_lowercase().contains("timeout");
2470 (
2471 driver.to_string(),
2472 passed,
2473 if passed {
2474 "timeout classification observed".to_string()
2475 } else {
2476 format!(
2477 "expected failed timeout run, got status={:?} errors={detail}",
2478 run.status
2479 )
2480 },
2481 )
2482 }
2483 Err(err) => (driver.to_string(), false, format!("runtime failed: {err}")),
2484 }
2485}
2486
2487fn parse_checkpoint_offset(checkpoint: Option<&str>) -> usize {
2488 let Some(raw) = checkpoint else {
2489 return 0;
2490 };
2491 let trimmed = raw.trim();
2492 if trimmed.is_empty() {
2493 return 0;
2494 }
2495 if let Ok(value) = trimmed.parse::<usize>() {
2496 return value;
2497 }
2498 if let Some(value) = trimmed
2499 .split("offset=")
2500 .nth(1)
2501 .and_then(|tail| tail.split(':').next())
2502 .and_then(|value| value.parse::<usize>().ok())
2503 {
2504 return value;
2505 }
2506 0
2507}
2508
2509fn is_retryable_pipeline_error(err: &DataError) -> bool {
2510 match err {
2511 DataError::Integration(message) | DataError::Query(message) => {
2512 let normalized = message.to_ascii_lowercase();
2513 normalized.contains("transient")
2514 || normalized.contains("timeout")
2515 || normalized.contains("unavailable")
2516 || normalized.contains("retry")
2517 }
2518 DataError::Io(_) => true,
2519 _ => false,
2520 }
2521}
2522
2523fn now_unix_ms() -> u64 {
2524 SystemTime::now()
2525 .duration_since(UNIX_EPOCH)
2526 .unwrap_or(Duration::from_secs(0))
2527 .as_millis() as u64
2528}
2529
2530fn system_time_to_unix_ms(value: SystemTime) -> Option<u64> {
2531 value
2532 .duration_since(UNIX_EPOCH)
2533 .ok()
2534 .map(|duration| duration.as_millis() as u64)
2535}
2536
2537fn next_counter() -> u64 {
2538 static RUN_COUNTER: AtomicU64 = AtomicU64::new(1);
2539 RUN_COUNTER.fetch_add(1, Ordering::Relaxed)
2540}
2541
2542pub fn pipeline_records_to_json_lines(records: &[Row]) -> DataResult<String> {
2543 let mut out = String::new();
2544 for row in records {
2545 out.push_str(&serde_json::to_string(row)?);
2546 out.push('\n');
2547 }
2548 Ok(out)
2549}
2550
2551pub fn write_pipeline_records_to_file(path: &Path, records: &[Row]) -> DataResult<()> {
2552 let payload = pipeline_records_to_json_lines(records)?;
2553 fs::write(path, payload)?;
2554 Ok(())
2555}
2556
2557#[cfg(test)]
2558mod tests {
2559 use super::*;
2560 use crate::{
2561 InMemoryBigQueryAdapter, InMemoryClickHouseAdapter, InMemoryOpenSearchAdapter,
2562 InMemorySingleStoreAdapter, StoredRow,
2563 };
2564 use serde_json::json;
2565 use std::collections::VecDeque;
2566
2567 fn row(pairs: &[(&str, Value)]) -> Row {
2568 pairs
2569 .iter()
2570 .map(|(key, value)| ((*key).to_string(), value.clone()))
2571 .collect()
2572 }
2573
2574 #[derive(Debug, Clone)]
2575 struct SequenceSource {
2576 source_name: String,
2577 responses: Arc<Mutex<VecDeque<DataResult<PipelineBatch>>>>,
2578 }
2579
2580 impl SequenceSource {
2581 fn new(
2582 source_name: impl Into<String>,
2583 responses: impl IntoIterator<Item = DataResult<PipelineBatch>>,
2584 ) -> Self {
2585 Self {
2586 source_name: source_name.into(),
2587 responses: Arc::new(Mutex::new(responses.into_iter().collect())),
2588 }
2589 }
2590 }
2591
2592 impl Source for SequenceSource {
2593 fn source_name(&self) -> &str {
2594 &self.source_name
2595 }
2596
2597 fn read_batch(
2598 &self,
2599 _pipeline: &Pipeline,
2600 _checkpoint: Option<&str>,
2601 _context: &QueryContext,
2602 ) -> DataResult<PipelineBatch> {
2603 let mut guard = self
2604 .responses
2605 .lock()
2606 .map_err(|_| DataError::Integration("sequence source lock poisoned".to_string()))?;
2607 Ok(guard
2608 .pop_front()
2609 .transpose()?
2610 .unwrap_or_else(PipelineBatch::empty))
2611 }
2612 }
2613
2614 #[derive(Debug, Clone)]
2615 struct AlwaysFailTransform {
2616 message: String,
2617 }
2618
2619 impl Transform for AlwaysFailTransform {
2620 fn transform_name(&self) -> &str {
2621 "always-fail"
2622 }
2623
2624 fn transform_batch(
2625 &self,
2626 _pipeline: &Pipeline,
2627 _run: &PipelineRun,
2628 _records: Vec<Row>,
2629 _context: &QueryContext,
2630 ) -> DataResult<TransformBatch> {
2631 Err(DataError::Validation(self.message.clone()))
2632 }
2633 }
2634
2635 #[derive(Debug, Clone)]
2636 struct AlwaysFailSink {
2637 sink_name: String,
2638 message: String,
2639 }
2640
2641 impl Sink for AlwaysFailSink {
2642 fn sink_name(&self) -> &str {
2643 &self.sink_name
2644 }
2645
2646 fn write_batch(
2647 &self,
2648 _pipeline: &Pipeline,
2649 _run: &PipelineRun,
2650 _records: &[Row],
2651 _context: &QueryContext,
2652 ) -> DataResult<SinkWriteResult> {
2653 Err(DataError::Validation(self.message.clone()))
2654 }
2655 }
2656
2657 #[test]
2658 fn pipeline_runtime_resume_checkpoint_and_idempotent_sink_prevents_duplicates() {
2659 let source = Arc::new(InMemoryRecordSource::new(
2660 "in-memory",
2661 vec![
2662 row(&[("id", json!(1)), ("account", json!("Acme"))]),
2663 row(&[("id", json!(2)), ("account", json!("Globex"))]),
2664 row(&[("id", json!(3)), ("account", json!("Umbrella"))]),
2665 ],
2666 ));
2667 let sink = Arc::new(IdempotentInMemorySink::new("idempotent", "id"));
2668 let checkpoints = Arc::new(InMemoryCheckpointStore::default());
2669 let dead_letters = Arc::new(InMemoryDeadLetterStore::default());
2670 let events = Arc::new(InMemoryPipelineEventStore::default());
2671 let context = QueryContext::default().with_tenant_id("tenant-a");
2672
2673 let first = PipelineRuntime::new(
2674 Pipeline::new(
2675 "pipeline-resume",
2676 "Resume Pipeline",
2677 "source",
2678 "transform",
2679 "sink",
2680 )
2681 .with_batch_size(1)
2682 .with_max_batches_per_run(Some(1)),
2683 source.clone(),
2684 Arc::new(PassthroughTransform),
2685 sink.clone(),
2686 checkpoints.clone(),
2687 dead_letters.clone(),
2688 events.clone(),
2689 )
2690 .execute_manual(context.clone(), PipelineControl::default(), None)
2691 .unwrap();
2692
2693 assert_eq!(first.status, PipelineRunStatus::Paused);
2694 assert_eq!(first.rows_written, 1);
2695 assert_eq!(sink.rows().unwrap().len(), 1);
2696
2697 let resumed = PipelineRuntime::new(
2698 Pipeline::new(
2699 "pipeline-resume",
2700 "Resume Pipeline",
2701 "source",
2702 "transform",
2703 "sink",
2704 )
2705 .with_batch_size(1),
2706 source.clone(),
2707 Arc::new(PassthroughTransform),
2708 sink.clone(),
2709 checkpoints.clone(),
2710 dead_letters.clone(),
2711 events.clone(),
2712 )
2713 .execute_manual(context.clone(), PipelineControl::default(), None)
2714 .unwrap();
2715
2716 assert_eq!(resumed.status, PipelineRunStatus::Succeeded);
2717 assert_eq!(sink.rows().unwrap().len(), 3);
2718
2719 let repeat = PipelineRuntime::new(
2720 Pipeline::new(
2721 "pipeline-resume",
2722 "Resume Pipeline",
2723 "source",
2724 "transform",
2725 "sink",
2726 )
2727 .with_batch_size(1),
2728 source,
2729 Arc::new(PassthroughTransform),
2730 sink.clone(),
2731 checkpoints,
2732 dead_letters,
2733 events,
2734 )
2735 .execute_manual(context, PipelineControl::default(), None)
2736 .unwrap();
2737 assert_eq!(repeat.rows_written, 0);
2738 assert_eq!(sink.rows().unwrap().len(), 3);
2739 }
2740
2741 #[derive(Debug, Clone)]
2742 struct StatusAwareTransform;
2743
2744 impl Transform for StatusAwareTransform {
2745 fn transform_name(&self) -> &str {
2746 "status-aware"
2747 }
2748
2749 fn transform_batch(
2750 &self,
2751 _pipeline: &Pipeline,
2752 _run: &PipelineRun,
2753 records: Vec<Row>,
2754 _context: &QueryContext,
2755 ) -> DataResult<TransformBatch> {
2756 let mut ok = Vec::new();
2757 let mut dead_letters = Vec::new();
2758 for record in records {
2759 if record
2760 .get("status")
2761 .and_then(Value::as_str)
2762 .is_some_and(|status| status == "bad")
2763 {
2764 dead_letters.push(PipelineDeadLetterInput {
2765 stage: PipelineStage::Transform,
2766 reason: "blocked status".to_string(),
2767 record,
2768 });
2769 } else {
2770 ok.push(record);
2771 }
2772 }
2773 Ok(TransformBatch {
2774 records: ok,
2775 dead_letters,
2776 })
2777 }
2778 }
2779
2780 #[test]
2781 fn pipeline_runtime_routes_transform_dead_letters_and_tracks_counts() {
2782 let runtime = PipelineRuntime::new(
2783 Pipeline::new(
2784 "pipeline-deadletter",
2785 "Deadletter Pipeline",
2786 "source",
2787 "status-aware",
2788 "sink",
2789 )
2790 .with_batch_size(10),
2791 Arc::new(InMemoryRecordSource::new(
2792 "in-memory",
2793 vec![
2794 row(&[("id", json!(1)), ("status", json!("ok"))]),
2795 row(&[("id", json!(2)), ("status", json!("bad"))]),
2796 row(&[("id", json!(3)), ("status", json!("ok"))]),
2797 ],
2798 )),
2799 Arc::new(StatusAwareTransform),
2800 Arc::new(IdempotentInMemorySink::new("sink", "id")),
2801 Arc::new(InMemoryCheckpointStore::default()),
2802 Arc::new(InMemoryDeadLetterStore::default()),
2803 Arc::new(InMemoryPipelineEventStore::default()),
2804 );
2805
2806 let run = runtime
2807 .execute_manual(QueryContext::default(), PipelineControl::default(), None)
2808 .unwrap();
2809 assert_eq!(run.status, PipelineRunStatus::Succeeded);
2810 assert_eq!(run.rows_dead_letter, 1);
2811
2812 let snapshot = runtime.operation_snapshot(&run).unwrap();
2813 assert_eq!(snapshot.dead_letters.len(), 1);
2814 assert!(snapshot.dead_letters[0].reason.contains("blocked status"));
2815 }
2816
2817 #[test]
2818 fn pipeline_runtime_respects_operator_pause_and_cancel_controls() {
2819 let runtime = PipelineRuntime::new(
2820 Pipeline::new(
2821 "pipeline-control",
2822 "Control Pipeline",
2823 "source",
2824 "transform",
2825 "sink",
2826 )
2827 .with_batch_size(1),
2828 Arc::new(InMemoryRecordSource::new(
2829 "in-memory",
2830 vec![
2831 row(&[("id", json!(1))]),
2832 row(&[("id", json!(2))]),
2833 row(&[("id", json!(3))]),
2834 ],
2835 )),
2836 Arc::new(PassthroughTransform),
2837 Arc::new(IdempotentInMemorySink::new("sink", "id")),
2838 Arc::new(InMemoryCheckpointStore::default()),
2839 Arc::new(InMemoryDeadLetterStore::default()),
2840 Arc::new(InMemoryPipelineEventStore::default()),
2841 );
2842
2843 let paused_control = PipelineControl::default();
2844 paused_control.pause();
2845 let paused = runtime
2846 .execute_manual(QueryContext::default(), paused_control, None)
2847 .unwrap();
2848 assert_eq!(paused.status, PipelineRunStatus::Paused);
2849 assert_eq!(paused.rows_read, 0);
2850
2851 let canceled_control = PipelineControl::default();
2852 canceled_control.cancel();
2853 let canceled = runtime
2854 .execute_manual(QueryContext::default(), canceled_control, None)
2855 .unwrap();
2856 assert_eq!(canceled.status, PipelineRunStatus::Canceled);
2857 assert_eq!(canceled.rows_read, 0);
2858 }
2859
2860 #[test]
2861 fn file_and_object_store_profiles_roundtrip_rows() {
2862 let tmp = std::env::temp_dir().join(format!("shelly_m60_file_{}.jsonl", now_unix_ms()));
2863 let records = vec![
2864 row(&[("id", json!(1)), ("account", json!("Acme"))]),
2865 row(&[("id", json!(2)), ("account", json!("Globex"))]),
2866 ];
2867 write_pipeline_records_to_file(&tmp, &records).unwrap();
2868
2869 let file_source = FileJsonLineSource::new("file-source", &tmp);
2870 let file_batch = file_source
2871 .read_batch(
2872 &Pipeline::new("file", "file", "file", "transform", "sink").with_batch_size(10),
2873 None,
2874 &QueryContext::default(),
2875 )
2876 .unwrap();
2877 assert_eq!(file_batch.records.len(), 2);
2878
2879 let object_store = InMemoryObjectStore::default();
2880 object_store
2881 .put(
2882 "accounts",
2883 records
2884 .iter()
2885 .map(|row| serde_json::to_value(row).unwrap())
2886 .collect(),
2887 )
2888 .unwrap();
2889 let object_source =
2890 ObjectStoreJsonSource::new("object-source", object_store.clone(), "accounts");
2891 let object_batch = object_source
2892 .read_batch(
2893 &Pipeline::new("obj", "obj", "obj", "transform", "sink").with_batch_size(10),
2894 None,
2895 &QueryContext::default(),
2896 )
2897 .unwrap();
2898 assert_eq!(object_batch.records.len(), 2);
2899
2900 let object_sink =
2901 ObjectStoreJsonSink::new("object-sink", object_store.clone(), "sink_rows", "id");
2902 let run = PipelineRun::new(
2903 "run-1".to_string(),
2904 "obj".to_string(),
2905 PipelineRunCommand::Manual { requested_by: None },
2906 &QueryContext::default(),
2907 );
2908 let write = object_sink
2909 .write_batch(
2910 &Pipeline::new("obj", "obj", "obj", "transform", "sink"),
2911 &run,
2912 &object_batch.records,
2913 &QueryContext::default(),
2914 )
2915 .unwrap();
2916 assert_eq!(write.written, 2);
2917 let stored = object_store.get("sink_rows").unwrap();
2918 assert_eq!(stored.len(), 2);
2919
2920 let _ = fs::remove_file(tmp);
2921 }
2922
2923 #[test]
2924 fn pipeline_builder_and_helper_paths_cover_edge_cases() {
2925 let pipeline = Pipeline::new("pipeline-helpers", "Helpers", "source", "transform", "sink")
2926 .with_trigger(PipelineTrigger::EventDriven)
2927 .with_batch_size(0)
2928 .with_max_batches_per_run(Some(0))
2929 .with_deadline_ms(0)
2930 .with_retry_policy(RetryPolicy {
2931 max_attempts: 0,
2932 initial_backoff_ms: 10,
2933 max_backoff_ms: 1,
2934 })
2935 .with_metadata("region", "us-east-1");
2936 assert_eq!(pipeline.trigger, PipelineTrigger::EventDriven);
2937 assert_eq!(pipeline.batch_size, 1);
2938 assert_eq!(pipeline.max_batches_per_run, Some(1));
2939 assert_eq!(pipeline.deadline_ms, 1);
2940 assert_eq!(pipeline.retry_policy.max_attempts, 1);
2941 assert_eq!(pipeline.retry_policy.initial_backoff_ms, 10);
2942 assert_eq!(pipeline.retry_policy.max_backoff_ms, 10);
2943 assert_eq!(
2944 pipeline.metadata.get("region").map(String::as_str),
2945 Some("us-east-1")
2946 );
2947
2948 assert_eq!(
2949 PipelineRunCommand::Manual { requested_by: None }.as_label(),
2950 "manual"
2951 );
2952 assert_eq!(
2953 PipelineRunCommand::Scheduled {
2954 schedule_id: "cron".to_string()
2955 }
2956 .as_label(),
2957 "scheduled"
2958 );
2959 assert_eq!(
2960 PipelineRunCommand::Event {
2961 event: "ingest".to_string(),
2962 payload: json!({"ok": true}),
2963 }
2964 .as_label(),
2965 "event"
2966 );
2967
2968 let mut run = PipelineRun::new(
2969 "run-helpers".to_string(),
2970 pipeline.id.clone(),
2971 PipelineRunCommand::Manual {
2972 requested_by: Some("operator".to_string()),
2973 },
2974 &QueryContext::default().with_tenant_id("tenant-a"),
2975 );
2976 assert_eq!(run.status, PipelineRunStatus::Queued);
2977 run.finish(PipelineRunStatus::Succeeded);
2978 assert_eq!(run.status, PipelineRunStatus::Succeeded);
2979 assert!(run.finished_at_unix_ms.is_some());
2980
2981 let empty_batch = PipelineBatch::empty();
2982 assert!(empty_batch.records.is_empty());
2983 assert!(!empty_batch.has_more);
2984 let passthrough = TransformBatch::passthrough(vec![row(&[("id", json!(1))])]);
2985 assert_eq!(passthrough.records.len(), 1);
2986 assert!(passthrough.dead_letters.is_empty());
2987
2988 let control = PipelineControl::default();
2989 assert!(!control.is_paused());
2990 assert!(!control.is_canceled());
2991 control.pause();
2992 assert!(control.is_paused());
2993 control.resume();
2994 assert!(!control.is_paused());
2995 control.cancel();
2996 assert!(control.is_canceled());
2997
2998 assert_eq!(parse_checkpoint_offset(None), 0);
2999 assert_eq!(parse_checkpoint_offset(Some("")), 0);
3000 assert_eq!(parse_checkpoint_offset(Some("7")), 7);
3001 assert_eq!(parse_checkpoint_offset(Some("query:offset=9:limit=1")), 9);
3002 assert_eq!(parse_checkpoint_offset(Some("offset=abc")), 0);
3003
3004 assert!(is_retryable_pipeline_error(&DataError::Integration(
3005 "transient backend timeout".to_string()
3006 )));
3007 assert!(is_retryable_pipeline_error(&DataError::Query(
3008 "service unavailable".to_string()
3009 )));
3010 assert!(is_retryable_pipeline_error(&DataError::Io(
3011 std::io::Error::other("disk busy")
3012 )));
3013 assert!(!is_retryable_pipeline_error(&DataError::Validation(
3014 "invalid payload".to_string()
3015 )));
3016
3017 let lines = pipeline_records_to_json_lines(&[
3018 row(&[("id", json!(1)), ("name", json!("Acme"))]),
3019 row(&[("id", json!(2)), ("name", json!("Globex"))]),
3020 ])
3021 .unwrap();
3022 assert!(lines.ends_with('\n'));
3023 assert_eq!(lines.lines().count(), 2);
3024 }
3025
3026 #[test]
3027 fn in_memory_pipeline_stores_cover_crud_paths() {
3028 let checkpoints = InMemoryCheckpointStore::default();
3029 checkpoints
3030 .save_checkpoint("pipeline-a", Some("tenant-a"), "11")
3031 .unwrap();
3032 assert_eq!(
3033 checkpoints
3034 .load_checkpoint("pipeline-a", Some("tenant-a"))
3035 .unwrap()
3036 .as_deref(),
3037 Some("11")
3038 );
3039 checkpoints
3040 .clear_checkpoint("pipeline-a", Some("tenant-a"))
3041 .unwrap();
3042 assert!(checkpoints
3043 .load_checkpoint("pipeline-a", Some("tenant-a"))
3044 .unwrap()
3045 .is_none());
3046
3047 let dead_letters = InMemoryDeadLetterStore::default();
3048 dead_letters
3049 .push_dead_letter(PipelineDeadLetter {
3050 run_id: "run-1".to_string(),
3051 pipeline_id: "pipeline-a".to_string(),
3052 stage: PipelineStage::Sink,
3053 reason: "duplicate".to_string(),
3054 record: row(&[("id", json!(1))]),
3055 checkpoint: Some("1".to_string()),
3056 attempt: 1,
3057 timestamp_unix_ms: now_unix_ms(),
3058 })
3059 .unwrap();
3060 dead_letters
3061 .push_dead_letter(PipelineDeadLetter {
3062 run_id: "run-2".to_string(),
3063 pipeline_id: "pipeline-a".to_string(),
3064 stage: PipelineStage::Transform,
3065 reason: "invalid".to_string(),
3066 record: row(&[("id", json!(2))]),
3067 checkpoint: Some("2".to_string()),
3068 attempt: 2,
3069 timestamp_unix_ms: now_unix_ms(),
3070 })
3071 .unwrap();
3072 let run1_letters = dead_letters.list_dead_letters("run-1").unwrap();
3073 assert_eq!(run1_letters.len(), 1);
3074 assert_eq!(run1_letters[0].reason, "duplicate");
3075
3076 let events = InMemoryPipelineEventStore::default();
3077 events
3078 .append_event(PipelineEvent {
3079 run_id: "run-1".to_string(),
3080 pipeline_id: "pipeline-a".to_string(),
3081 timestamp_unix_ms: now_unix_ms(),
3082 kind: PipelineEventKind::RunStarted,
3083 message: "started".to_string(),
3084 tenant_id: Some("tenant-a".to_string()),
3085 trace_id: None,
3086 correlation_id: Some("corr-1".to_string()),
3087 request_id: Some("req-1".to_string()),
3088 metrics: BTreeMap::from_iter([("rows_read".to_string(), 0u64)]),
3089 metadata: BTreeMap::from_iter([("source".to_string(), "in-memory".to_string())]),
3090 })
3091 .unwrap();
3092 events
3093 .append_event(PipelineEvent {
3094 run_id: "run-2".to_string(),
3095 pipeline_id: "pipeline-a".to_string(),
3096 timestamp_unix_ms: now_unix_ms(),
3097 kind: PipelineEventKind::RunSucceeded,
3098 message: "done".to_string(),
3099 tenant_id: Some("tenant-a".to_string()),
3100 trace_id: None,
3101 correlation_id: Some("corr-2".to_string()),
3102 request_id: Some("req-2".to_string()),
3103 metrics: BTreeMap::new(),
3104 metadata: BTreeMap::new(),
3105 })
3106 .unwrap();
3107 let run1_events = events.list_events("run-1").unwrap();
3108 assert_eq!(run1_events.len(), 1);
3109 assert_eq!(run1_events[0].kind, PipelineEventKind::RunStarted);
3110 }
3111
3112 #[test]
3113 fn source_profiles_cover_missing_and_invalid_payload_paths() {
3114 let pipeline = Pipeline::new("source-checks", "Source Checks", "source", "t", "sink")
3115 .with_batch_size(2);
3116
3117 let missing_file =
3118 std::env::temp_dir().join(format!("shelly_missing_{}.jsonl", now_unix_ms()));
3119 let missing_source = FileJsonLineSource::new("missing-file", &missing_file);
3120 let missing_batch = missing_source
3121 .read_batch(&pipeline, Some("offset=3"), &QueryContext::default())
3122 .unwrap();
3123 assert!(missing_batch.records.is_empty());
3124 assert_eq!(missing_batch.next_checkpoint.as_deref(), Some("3"));
3125 assert_eq!(Source::source_name(&missing_source), "missing-file");
3126
3127 let invalid_file =
3128 std::env::temp_dir().join(format!("shelly_invalid_{}.jsonl", now_unix_ms()));
3129 fs::write(&invalid_file, "[1,2,3]\n").unwrap();
3130 let invalid_source = FileJsonLineSource::new("invalid-file", &invalid_file);
3131 let err = invalid_source
3132 .read_batch(&pipeline, None, &QueryContext::default())
3133 .unwrap_err()
3134 .to_string();
3135 assert!(err.contains("expected JSON object row"));
3136 let _ = fs::remove_file(&invalid_file);
3137
3138 let object_store = InMemoryObjectStore::default();
3139 object_store
3140 .put("bad_rows", vec![json!(["not-an-object"])])
3141 .unwrap();
3142 let object_source = ObjectStoreJsonSource::new("object-source", object_store, "bad_rows");
3143 let err = object_source
3144 .read_batch(&pipeline, None, &QueryContext::default())
3145 .unwrap_err()
3146 .to_string();
3147 assert!(err.contains("expected JSON object rows"));
3148 }
3149
3150 #[test]
3151 fn enterprise_pipeline_adapters_cover_success_and_error_paths() {
3152 let sql_rows = vec![
3153 row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
3154 row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
3155 ];
3156 let search_rows = vec![
3157 StoredRow {
3158 id: 1,
3159 data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
3160 },
3161 StoredRow {
3162 id: 2,
3163 data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
3164 },
3165 ];
3166 let pipeline = Pipeline::new("enterprise", "Enterprise", "source", "transform", "sink")
3167 .with_batch_size(2);
3168 let context = QueryContext::default()
3169 .with_tenant_id("tenant-a")
3170 .with_correlation_id("corr-a")
3171 .with_request_id("req-a");
3172 let query = SqlCommand::new(
3173 "SELECT tenant, active_accounts FROM tenant_rollup",
3174 Vec::new(),
3175 );
3176
3177 let singlestore_source = SingleStorePipelineSource::new(
3178 Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3179 query.clone(),
3180 "tenant_rollup",
3181 )
3182 .with_contract(AdapterCallContract::default_query().with_timeout_ms(5_000))
3183 .with_artificial_delay_ms(0);
3184 let batch = singlestore_source
3185 .read_batch(&pipeline, Some("offset=0"), &context)
3186 .unwrap();
3187 assert_eq!(batch.records.len(), 2);
3188 assert_eq!(Source::source_name(&singlestore_source), "singlestore");
3189
3190 let clickhouse_source = ClickHousePipelineSource::new(
3191 Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3192 query.clone(),
3193 "tenant_rollup",
3194 )
3195 .with_contract(AdapterCallContract::low_latency().with_timeout_ms(1_000))
3196 .with_artificial_delay_ms(0);
3197 let batch = clickhouse_source
3198 .read_batch(&pipeline, Some("offset=0"), &context)
3199 .unwrap();
3200 assert_eq!(batch.records.len(), 2);
3201
3202 let bigquery_source = BigQueryPipelineSource::new(
3203 Arc::new(InMemoryBigQueryAdapter::new(sql_rows.clone())),
3204 query.clone(),
3205 "tenant_rollup",
3206 )
3207 .with_contract(AdapterCallContract::default_query())
3208 .with_artificial_delay_ms(0);
3209 let batch = bigquery_source
3210 .read_batch(&pipeline, Some("offset=0"), &context)
3211 .unwrap();
3212 assert_eq!(batch.records.len(), 2);
3213
3214 let opensearch_source = OpenSearchPipelineSource::new(
3215 Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3216 SearchRequest::new("accounts", ""),
3217 )
3218 .with_contract(AdapterCallContract::default_query())
3219 .with_artificial_delay_ms(0);
3220 let batch = opensearch_source
3221 .read_batch(&pipeline, Some("offset=0"), &context)
3222 .unwrap();
3223 assert_eq!(batch.records.len(), 2);
3224
3225 let err = SingleStorePipelineSource::new(
3226 Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3227 SqlCommand::new("", Vec::new()),
3228 "tenant_rollup",
3229 )
3230 .read_batch(&pipeline, None, &context)
3231 .unwrap_err()
3232 .to_string();
3233 assert!(err.contains("empty SQL statement"));
3234
3235 let err = OpenSearchPipelineSource::new(
3236 Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3237 SearchRequest::new("", ""),
3238 )
3239 .read_batch(&pipeline, None, &context)
3240 .unwrap_err()
3241 .to_string();
3242 assert!(err.contains("search index must not be empty"));
3243
3244 let run = PipelineRun::new(
3245 "run-1".to_string(),
3246 "enterprise".to_string(),
3247 PipelineRunCommand::Manual { requested_by: None },
3248 &context,
3249 );
3250 let records = vec![
3251 row(&[("id", json!(1)), ("account", json!("Acme"))]),
3252 row(&[("id", json!(1)), ("account", json!("Acme"))]),
3253 row(&[("id", json!(2)), ("account", json!("Globex"))]),
3254 ];
3255
3256 let singlestore_sink = SingleStorePipelineSink::new(
3257 Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3258 "id",
3259 )
3260 .with_probe_statement("SELECT 1")
3261 .with_contract(AdapterCallContract::default_query());
3262 let write = singlestore_sink
3263 .write_batch(&pipeline, &run, &records, &context)
3264 .unwrap();
3265 assert_eq!(write.written, 2);
3266 assert_eq!(write.duplicate_writes, 1);
3267 assert_eq!(Sink::sink_name(&singlestore_sink), "singlestore");
3268 assert!(format!("{singlestore_sink:?}").contains("SingleStorePipelineSink"));
3269
3270 let err = SingleStorePipelineSink::new(
3271 Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3272 "id",
3273 )
3274 .with_probe_statement("")
3275 .write_batch(&pipeline, &run, &records, &context)
3276 .unwrap_err()
3277 .to_string();
3278 assert!(err.contains("empty SQL statement"));
3279
3280 let clickhouse_sink = ClickHousePipelineSink::new(
3281 Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3282 "id",
3283 );
3284 let write = clickhouse_sink
3285 .write_batch(&pipeline, &run, &records, &context)
3286 .unwrap();
3287 assert_eq!(write.written, 2);
3288 assert_eq!(Sink::sink_name(&clickhouse_sink), "clickhouse");
3289 assert!(format!("{clickhouse_sink:?}").contains("ClickHousePipelineSink"));
3290
3291 let bigquery_sink =
3292 BigQueryPipelineSink::new(Arc::new(InMemoryBigQueryAdapter::new(sql_rows)), "id");
3293 let write = bigquery_sink
3294 .write_batch(&pipeline, &run, &records, &context)
3295 .unwrap();
3296 assert_eq!(write.written, 2);
3297 assert_eq!(Sink::sink_name(&bigquery_sink), "bigquery");
3298 assert!(format!("{bigquery_sink:?}").contains("BigQueryPipelineSink"));
3299
3300 let opensearch_sink = OpenSearchPipelineSink::new(
3301 Arc::new(InMemoryOpenSearchAdapter::new(search_rows.clone())),
3302 "id",
3303 "accounts",
3304 );
3305 let write = opensearch_sink
3306 .write_batch(&pipeline, &run, &records, &context)
3307 .unwrap();
3308 assert_eq!(write.written, 2);
3309 assert_eq!(Sink::sink_name(&opensearch_sink), "opensearch");
3310 assert!(format!("{opensearch_sink:?}").contains("OpenSearchPipelineSink"));
3311
3312 let err = OpenSearchPipelineSink::new(
3313 Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
3314 "id",
3315 "",
3316 )
3317 .write_batch(&pipeline, &run, &records, &context)
3318 .unwrap_err()
3319 .to_string();
3320 assert!(err.contains("search index must not be empty"));
3321 }
3322
3323 #[test]
3324 fn conformance_report_tracks_failures() {
3325 let mut report = PipelineConformanceReport::default();
3326 report.add_pass("checkpoint_resume", "ok");
3327 assert!(report.passed());
3328 report.add_fail("retry", "transient error");
3329 assert!(!report.passed());
3330 assert_eq!(report.checks.len(), 2);
3331 assert!(!report.checks[1].passed);
3332 }
3333
3334 #[test]
3335 fn pipeline_adapter_conformance_suite_passes_with_reference_adapters() {
3336 let sql_rows = vec![
3337 row(&[("tenant", json!("north")), ("active_accounts", json!(42))]),
3338 row(&[("tenant", json!("south")), ("active_accounts", json!(27))]),
3339 ];
3340 let search_rows = vec![
3341 StoredRow {
3342 id: 1,
3343 data: row(&[("account", json!("Acme")), ("status", json!("renewal"))]),
3344 },
3345 StoredRow {
3346 id: 2,
3347 data: row(&[("account", json!("Globex")), ("status", json!("new"))]),
3348 },
3349 ];
3350
3351 let report = run_pipeline_adapter_conformance_suite(
3352 Arc::new(InMemorySingleStoreAdapter::new(sql_rows.clone())),
3353 Arc::new(InMemoryClickHouseAdapter::new(sql_rows.clone())),
3354 Arc::new(InMemoryBigQueryAdapter::new(sql_rows)),
3355 Arc::new(InMemoryOpenSearchAdapter::new(search_rows)),
3356 &QueryContext::default()
3357 .with_tenant_id("tenant-a")
3358 .with_correlation_id("m60-correlation")
3359 .with_request_id("m60-request"),
3360 );
3361
3362 assert!(
3363 report.passed(),
3364 "expected conformance suite to pass, got {:?}",
3365 report
3366 );
3367 assert!(report
3368 .checks
3369 .iter()
3370 .any(|check| check.name.ends_with(".timeout")));
3371 assert!(report
3372 .checks
3373 .iter()
3374 .any(|check| check.name.ends_with(".retry")));
3375 assert!(report
3376 .checks
3377 .iter()
3378 .any(|check| check.name.ends_with(".checkpoint_resume")));
3379 }
3380
3381 #[test]
3382 fn pipeline_runtime_command_wrappers_execute_scheduled_and_event() {
3383 let runtime = PipelineRuntime::new(
3384 Pipeline::new(
3385 "pipeline-command-wrappers",
3386 "Pipeline Command Wrappers",
3387 "sequence",
3388 "passthrough",
3389 "sink",
3390 ),
3391 Arc::new(SequenceSource::new(
3392 "sequence",
3393 [Ok(PipelineBatch::empty()), Ok(PipelineBatch::empty())],
3394 )),
3395 Arc::new(PassthroughTransform),
3396 Arc::new(IdempotentInMemorySink::new("sink", "id")),
3397 Arc::new(InMemoryCheckpointStore::default()),
3398 Arc::new(InMemoryDeadLetterStore::default()),
3399 Arc::new(InMemoryPipelineEventStore::default()),
3400 );
3401 let context = QueryContext::default().with_tenant_id("tenant-schedule");
3402
3403 let scheduled = runtime
3404 .execute_scheduled(
3405 context.clone(),
3406 PipelineControl::default(),
3407 "schedule-nightly",
3408 )
3409 .unwrap();
3410 assert_eq!(scheduled.status, PipelineRunStatus::Succeeded);
3411 assert!(matches!(
3412 scheduled.command,
3413 PipelineRunCommand::Scheduled { .. }
3414 ));
3415
3416 let event = runtime
3417 .execute_event(
3418 context,
3419 PipelineControl::default(),
3420 "ingest",
3421 json!({"reason": "manual"}),
3422 )
3423 .unwrap();
3424 assert_eq!(event.status, PipelineRunStatus::Succeeded);
3425 assert!(matches!(event.command, PipelineRunCommand::Event { .. }));
3426 }
3427
3428 #[test]
3429 fn pipeline_runtime_covers_source_error_and_deadline_paths() {
3430 let source_failure_runtime = PipelineRuntime::new(
3431 Pipeline::new(
3432 "pipeline-source-failure",
3433 "Pipeline Source Failure",
3434 "sequence",
3435 "passthrough",
3436 "sink",
3437 )
3438 .with_retry_policy(RetryPolicy::never()),
3439 Arc::new(SequenceSource::new(
3440 "sequence",
3441 [Err(DataError::Validation(
3442 "source validation failure".to_string(),
3443 ))],
3444 )),
3445 Arc::new(PassthroughTransform),
3446 Arc::new(IdempotentInMemorySink::new("sink", "id")),
3447 Arc::new(InMemoryCheckpointStore::default()),
3448 Arc::new(InMemoryDeadLetterStore::default()),
3449 Arc::new(InMemoryPipelineEventStore::default()),
3450 );
3451 let source_failed = source_failure_runtime
3452 .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3453 .unwrap();
3454 assert_eq!(source_failed.status, PipelineRunStatus::Failed);
3455 assert!(source_failed
3456 .errors
3457 .iter()
3458 .any(|error| error.contains("source validation failure")));
3459
3460 let deadline_runtime = PipelineRuntime::new(
3461 Pipeline::new(
3462 "pipeline-deadline",
3463 "Pipeline Deadline",
3464 "sequence",
3465 "passthrough",
3466 "sink",
3467 )
3468 .with_batch_size(1)
3469 .with_deadline_ms(1),
3470 Arc::new(SequenceSource::new(
3471 "sequence",
3472 [Ok(PipelineBatch {
3473 records: vec![row(&[("id", json!(1))])],
3474 next_checkpoint: Some("1".to_string()),
3475 has_more: true,
3476 lineage: Some("deadline-test".to_string()),
3477 source_freshness_unix_ms: Some(now_unix_ms()),
3478 })],
3479 )),
3480 Arc::new(PassthroughTransform),
3481 Arc::new(
3482 IdempotentInMemorySink::new("sink", "id").with_probe(Arc::new(|_context| {
3483 thread::sleep(Duration::from_millis(5));
3484 Ok(())
3485 })),
3486 ),
3487 Arc::new(InMemoryCheckpointStore::default()),
3488 Arc::new(InMemoryDeadLetterStore::default()),
3489 Arc::new(InMemoryPipelineEventStore::default()),
3490 );
3491 let deadline_failed = deadline_runtime
3492 .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3493 .unwrap();
3494 assert_eq!(deadline_failed.status, PipelineRunStatus::Failed);
3495 assert!(deadline_failed
3496 .errors
3497 .iter()
3498 .any(|error| error.contains("deadline exceeded")));
3499 }
3500
3501 #[test]
3502 fn pipeline_runtime_covers_transform_and_sink_failure_paths() {
3503 let transform_failure_source = Arc::new(SequenceSource::new(
3504 "sequence",
3505 [Ok(PipelineBatch {
3506 records: vec![
3507 row(&[("id", json!(1)), ("tenant", json!("a"))]),
3508 row(&[("id", json!(2)), ("tenant", json!("b"))]),
3509 ],
3510 next_checkpoint: Some("2".to_string()),
3511 has_more: false,
3512 lineage: Some("transform-failure".to_string()),
3513 source_freshness_unix_ms: Some(now_unix_ms()),
3514 })],
3515 ));
3516 let transform_failure_dead_letters = Arc::new(InMemoryDeadLetterStore::default());
3517 let transform_failure_runtime = PipelineRuntime::new(
3518 Pipeline::new(
3519 "pipeline-transform-failure",
3520 "Pipeline Transform Failure",
3521 "sequence",
3522 "always-fail",
3523 "sink",
3524 ),
3525 transform_failure_source,
3526 Arc::new(AlwaysFailTransform {
3527 message: "transform denied row".to_string(),
3528 }),
3529 Arc::new(IdempotentInMemorySink::new("sink", "id")),
3530 Arc::new(InMemoryCheckpointStore::default()),
3531 transform_failure_dead_letters.clone(),
3532 Arc::new(InMemoryPipelineEventStore::default()),
3533 );
3534 let transform_failed = transform_failure_runtime
3535 .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3536 .unwrap();
3537 assert_eq!(transform_failed.status, PipelineRunStatus::Failed);
3538 assert_eq!(transform_failed.rows_dead_letter, 2);
3539 assert_eq!(transform_failed.last_checkpoint.as_deref(), Some("2"));
3540 assert!(transform_failed
3541 .errors
3542 .iter()
3543 .any(|error| error.contains("transform denied row")));
3544 let transform_snapshot = transform_failure_runtime
3545 .operation_snapshot(&transform_failed)
3546 .unwrap();
3547 assert_eq!(transform_snapshot.dead_letters.len(), 2);
3548 assert!(transform_snapshot
3549 .dead_letters
3550 .iter()
3551 .all(|dead_letter| dead_letter.stage == PipelineStage::Transform));
3552 assert_eq!(
3553 transform_failure_dead_letters
3554 .list_dead_letters(&transform_failed.id)
3555 .unwrap()
3556 .len(),
3557 2
3558 );
3559
3560 let sink_failure_source = Arc::new(SequenceSource::new(
3561 "sequence",
3562 [Ok(PipelineBatch {
3563 records: vec![
3564 row(&[("id", json!(10)), ("tenant", json!("west"))]),
3565 row(&[("id", json!(11)), ("tenant", json!("east"))]),
3566 ],
3567 next_checkpoint: Some("2".to_string()),
3568 has_more: false,
3569 lineage: Some("sink-failure".to_string()),
3570 source_freshness_unix_ms: Some(now_unix_ms()),
3571 })],
3572 ));
3573 let sink_failure_runtime = PipelineRuntime::new(
3574 Pipeline::new(
3575 "pipeline-sink-failure",
3576 "Pipeline Sink Failure",
3577 "sequence",
3578 "passthrough",
3579 "always-fail",
3580 ),
3581 sink_failure_source,
3582 Arc::new(PassthroughTransform),
3583 Arc::new(AlwaysFailSink {
3584 sink_name: "always-fail".to_string(),
3585 message: "sink rejected rows".to_string(),
3586 }),
3587 Arc::new(InMemoryCheckpointStore::default()),
3588 Arc::new(InMemoryDeadLetterStore::default()),
3589 Arc::new(InMemoryPipelineEventStore::default()),
3590 );
3591 let sink_run = sink_failure_runtime
3592 .execute_manual(QueryContext::default(), PipelineControl::default(), None)
3593 .unwrap();
3594 assert_eq!(sink_run.status, PipelineRunStatus::Succeeded);
3595 assert_eq!(sink_run.rows_written, 0);
3596 assert_eq!(sink_run.rows_dead_letter, 2);
3597 assert!(sink_run
3598 .errors
3599 .iter()
3600 .any(|error| error.contains("sink rejected rows")));
3601 let sink_snapshot = sink_failure_runtime.operation_snapshot(&sink_run).unwrap();
3602 assert_eq!(sink_snapshot.dead_letters.len(), 2);
3603 assert!(sink_snapshot
3604 .dead_letters
3605 .iter()
3606 .all(|dead_letter| dead_letter.stage == PipelineStage::Sink));
3607 }
3608}