1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use serde_json::Value as JsonValue;
6use sqlx::AnyPool;
7use sqlx::any::AnyPoolOptions;
8use sqlx::any::AnyRow;
9use tokio::sync::OnceCell;
10use tracing::{debug, error, info, warn};
11
12use camel_component_api::{Body, CamelError, Exchange, Message};
13use camel_component_api::{ConcurrencyModel, Consumer, ConsumerContext};
14
15use crate::config::{SqlEndpointConfig, enrich_db_url_with_ssl, redact_db_url};
16use crate::headers;
17use crate::query::{QueryTemplate, parse_query_template, resolve_params};
18use crate::utils::{bind_json_values, row_to_json};
19
20pub struct SqlConsumer {
21 pub(crate) config: SqlEndpointConfig,
22 pub(crate) pool: Arc<OnceCell<AnyPool>>,
23 stopped: bool,
24}
25
26impl SqlConsumer {
27 pub fn new(config: SqlEndpointConfig, pool: Arc<OnceCell<AnyPool>>) -> Self {
28 Self {
29 config,
30 pool,
31 stopped: false,
32 }
33 }
34
35 async fn poll_database(
37 &self,
38 pool: &AnyPool,
39 context: &ConsumerContext,
40 template: &QueryTemplate,
41 ) -> Result<(), CamelError> {
42 let empty_exchange = Exchange::new(Message::default());
44
45 let prepared = resolve_params(template, &empty_exchange, &self.config.in_separator)?;
47
48 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
50 let rows: Vec<AnyRow> = query
51 .fetch_all(pool)
52 .await
53 .map_err(|e| CamelError::ProcessorError(format!("Query execution failed: {}", e)))?;
54
55 if rows.is_empty() && !self.config.route_empty_result_set {
57 return Ok(());
58 }
59
60 let rows_to_process: Vec<AnyRow> = if let Some(max) = self.config.max_messages_per_poll {
62 if max > 0 {
63 rows.into_iter().take(max as usize).collect()
64 } else {
65 rows
66 }
67 } else {
68 rows
69 };
70
71 if self.config.use_iterator {
72 for row in rows_to_process {
74 let row_json = row_to_json(&row)?;
75
76 let mut msg = Message::new(Body::Json(row_json.clone()));
78
79 if let Some(obj) = row_json.as_object() {
81 for (key, value) in obj {
82 msg.set_header(format!("CamelSql.{}", key), value.clone());
83 }
84 }
85
86 let exchange = Exchange::new(msg);
87
88 let result = context.send_and_wait(exchange).await;
90
91 if let Err(e) = self.handle_post_processing(pool, &result, &row_json).await {
93 error!(error = %e, "Post-processing failed");
94 if self.config.break_batch_on_consume_fail {
95 return Err(e);
96 }
97 }
98
99 if let Err(ref consume_err) = result
101 && self.config.break_batch_on_consume_fail
102 {
103 return Err(consume_err.clone());
104 }
105 }
106 } else {
107 let rows_json: Vec<JsonValue> = rows_to_process
109 .iter()
110 .map(row_to_json)
111 .collect::<Result<Vec<_>, CamelError>>()?;
112
113 let row_count = rows_json.len();
114
115 let mut msg = Message::new(Body::Json(JsonValue::Array(rows_json.clone())));
117 msg.set_header(headers::ROW_COUNT, JsonValue::Number(row_count.into()));
118
119 let exchange = Exchange::new(msg);
120
121 let result = context.send_and_wait(exchange).await;
123
124 for row_json in rows_json.iter() {
128 if let Err(e) = self.handle_post_processing(pool, &result, row_json).await {
129 error!(error = %e, "Post-processing failed for batch row");
130 if self.config.break_batch_on_consume_fail {
131 return Err(e);
132 }
133 }
134 }
135
136 if let Err(ref consume_err) = result
138 && self.config.break_batch_on_consume_fail
139 {
140 return Err(consume_err.clone());
141 }
142 }
143
144 if let Some(ref batch_query) = self.config.on_consume_batch_complete
146 && let Err(e) = self
147 .execute_post_query(pool, batch_query, &JsonValue::Null)
148 .await
149 {
150 error!(error = %e, "onConsumeBatchComplete query failed");
151 }
152
153 Ok(())
154 }
155
156 async fn handle_post_processing(
158 &self,
159 pool: &AnyPool,
160 result: &Result<Exchange, CamelError>,
161 row_json: &JsonValue,
162 ) -> Result<(), CamelError> {
163 match result {
164 Ok(_) => {
165 if let Some(ref on_consume) = self.config.on_consume {
167 self.execute_post_query(pool, on_consume, row_json).await?;
168 }
169 }
170 Err(_) => {
171 if let Some(ref on_consume_failed) = self.config.on_consume_failed {
173 self.execute_post_query(pool, on_consume_failed, row_json)
174 .await?;
175 }
176 }
177 }
178 Ok(())
179 }
180
181 async fn execute_post_query(
183 &self,
184 pool: &AnyPool,
185 query_str: &str,
186 row_json: &JsonValue,
187 ) -> Result<(), CamelError> {
188 let template = parse_query_template(query_str, self.config.placeholder)?;
190
191 let mut temp_msg = Message::new(Body::Json(row_json.clone()));
194 if let Some(obj) = row_json.as_object() {
195 for (key, value) in obj {
196 temp_msg.set_header(format!("CamelSql.{}", key), value.clone());
197 }
198 }
199 let temp_exchange = Exchange::new(temp_msg);
200
201 let prepared = resolve_params(&template, &temp_exchange, &self.config.in_separator)?;
203
204 let query = bind_json_values(sqlx::query(&prepared.sql), &prepared.bindings);
206 let result = query.execute(pool).await.map_err(|e| {
207 CamelError::ProcessorError(format!("Post-query execution failed: {}", e))
208 })?;
209
210 if result.rows_affected() == 0 {
212 warn!(
213 query = query_str,
214 "Post-processing query affected 0 rows — the row may not have been marked correctly"
215 );
216 }
217
218 Ok(())
219 }
220}
221
222#[async_trait]
223impl Consumer for SqlConsumer {
224 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
225 if self.stopped {
227 return Err(CamelError::Config(
228 "SQL consumer cannot be restarted after stop".into(),
229 ));
230 }
231
232 let pool = self
234 .pool
235 .get_or_try_init(|| async {
236 self.config.resolve_defaults();
238
239 sqlx::any::install_default_drivers();
242 let db_url = enrich_db_url_with_ssl(&self.config.db_url, &self.config)?;
243
244 let max_conn = self.config.max_connections.ok_or_else(|| {
245 CamelError::Config("max_connections not resolved for SQL consumer pool".into())
246 })?;
247 let min_conn = self.config.min_connections.ok_or_else(|| {
248 CamelError::Config("min_connections not resolved for SQL consumer pool".into())
249 })?;
250 let idle_timeout = self.config.idle_timeout_secs.ok_or_else(|| {
251 CamelError::Config(
252 "idle_timeout_secs not resolved for SQL consumer pool".into(),
253 )
254 })?;
255 let max_lifetime = self.config.max_lifetime_secs.ok_or_else(|| {
256 CamelError::Config(
257 "max_lifetime_secs not resolved for SQL consumer pool".into(),
258 )
259 })?;
260
261 info!(
262 db_url = %redact_db_url(&self.config.db_url),
263 "SQL consumer pool initializing"
264 );
265 AnyPoolOptions::new()
266 .max_connections(max_conn)
267 .min_connections(min_conn)
268 .idle_timeout(Duration::from_secs(idle_timeout))
269 .max_lifetime(Duration::from_secs(max_lifetime))
270 .connect(&db_url)
271 .await
272 .map_err(|e| {
273 CamelError::EndpointCreationFailed(format!(
274 "Failed to connect to database: {}",
275 e
276 ))
277 })
278 })
279 .await?;
280
281 if self.config.on_consume.is_none() {
283 warn!(
284 "SQL consumer started without onConsume configured — consumed rows will not be marked/deleted"
285 );
286 }
287
288 info!(
289 db_url = %redact_db_url(&self.config.db_url),
290 query_len = self.config.query.len(),
291 "SQL consumer started"
292 );
293
294 let template = parse_query_template(&self.config.query, self.config.placeholder)
296 .map_err(|e| CamelError::Config(format!("Invalid query template: {}", e)))?;
297
298 if self.config.initial_delay_ms > 0 {
300 tokio::select! {
301 _ = context.cancelled() => {
302 info!("SQL consumer stopped during initial delay");
303 return Ok(());
304 }
305 _ = tokio::time::sleep(Duration::from_millis(self.config.initial_delay_ms)) => {}
306 }
307 }
308
309 loop {
311 tokio::select! {
312 _ = context.cancelled() => {
313 info!("SQL consumer stopped");
314 break;
315 }
316 _ = tokio::time::sleep(Duration::from_millis(self.config.delay_ms)) => {
317 if let Err(e) = self.poll_database(pool, &context, &template).await {
318 error!(error = %e, "SQL consumer poll failed");
319 }
320 }
321 }
322 }
323
324 Ok(())
325 }
326
327 async fn stop(&mut self) -> Result<(), CamelError> {
328 if self.stopped {
330 debug!("SQL consumer stop called on already-stopped consumer");
331 return Ok(());
332 }
333
334 if let Some(pool) = self.pool.get() {
336 debug!("SQL consumer closing connection pool");
337 pool.close().await;
338 debug!("SQL consumer pool closed");
339 }
340
341 self.stopped = true;
342 info!("SQL consumer stopped");
343 Ok(())
344 }
345
346 fn concurrency_model(&self) -> ConcurrencyModel {
347 ConcurrencyModel::Sequential
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use crate::config::SqlEndpointConfig;
358 use camel_component_api::ExchangeEnvelope;
359 use camel_component_api::UriConfig;
360 use sqlx::any::AnyPoolOptions;
361 use std::sync::Arc;
362 use std::time::Duration;
363 use tokio::sync::mpsc;
364 use tokio_util::sync::CancellationToken;
365
366 async fn sqlite_pool() -> AnyPool {
367 sqlx::any::install_default_drivers();
368 AnyPoolOptions::new()
369 .max_connections(1)
370 .connect("sqlite::memory:")
371 .await
372 .expect("sqlite pool")
373 }
374
375 async fn seed_consumer_table(pool: &AnyPool) {
376 sqlx::query("CREATE TABLE jobs (id INTEGER PRIMARY KEY, processed INTEGER DEFAULT 0, failed INTEGER DEFAULT 0)")
377 .execute(pool)
378 .await
379 .expect("create table");
380 sqlx::query("INSERT INTO jobs (id, processed, failed) VALUES (1, 0, 0), (2, 0, 0)")
381 .execute(pool)
382 .await
383 .expect("seed rows");
384 }
385
386 fn config() -> SqlEndpointConfig {
387 let mut c =
388 SqlEndpointConfig::from_uri("sql:select * from t?db_url=postgres://localhost/test")
389 .unwrap();
390 c.resolve_defaults();
391 c
392 }
393
394 #[test]
395 fn consumer_concurrency_model() {
396 let c = SqlConsumer::new(config(), Arc::new(OnceCell::new()));
397 assert_eq!(c.concurrency_model(), ConcurrencyModel::Sequential);
398 }
399
400 #[test]
401 fn consumer_stores_config() {
402 let mut config = SqlEndpointConfig::from_uri(
403 "sql:select * from t?db_url=postgres://localhost/test&delay=2000&onConsume=update t set done=true"
404 ).unwrap();
405 config.resolve_defaults();
406 let c = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
407 assert_eq!(c.config.delay_ms, 2000);
408 assert!(c.config.on_consume.is_some());
409 }
410
411 #[tokio::test]
412 async fn poll_database_runs_on_consume_for_successful_rows() {
413 let pool = sqlite_pool().await;
414 seed_consumer_table(&pool).await;
415
416 let mut config = SqlEndpointConfig::from_uri(
417 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
418 )
419 .unwrap();
420 config.resolve_defaults();
421
422 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
423 let template = parse_query_template(&config.query, config.placeholder).unwrap();
424
425 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
426 tokio::spawn(async move {
427 while let Some(env) = rx.recv().await {
428 if let Some(reply_tx) = env.reply_tx {
429 let _ = reply_tx.send(Ok(env.exchange));
430 }
431 }
432 });
433 let ctx = ConsumerContext::new(tx, CancellationToken::new());
434
435 consumer
436 .poll_database(&pool, &ctx, &template)
437 .await
438 .expect("poll must succeed");
439
440 let row = sqlx::query("select processed from jobs where id = 1")
441 .fetch_one(&pool)
442 .await
443 .expect("row 1");
444 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
445
446 let row = sqlx::query("select processed from jobs where id = 2")
447 .fetch_one(&pool)
448 .await
449 .expect("row 2");
450 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
451
452 assert_eq!(processed_1, 1);
453 assert_eq!(processed_2, 1);
454 }
455
456 #[tokio::test]
457 async fn poll_database_runs_on_consume_failed_when_downstream_fails() {
458 let pool = sqlite_pool().await;
459 seed_consumer_table(&pool).await;
460
461 let mut config = SqlEndpointConfig::from_uri(
462 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&initialDelay=0&delay=1",
463 )
464 .unwrap();
465 config.resolve_defaults();
466
467 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
468 let template = parse_query_template(&config.query, config.placeholder).unwrap();
469
470 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
471 tokio::spawn(async move {
472 while let Some(env) = rx.recv().await {
473 if let Some(reply_tx) = env.reply_tx {
474 let _ =
475 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
476 }
477 }
478 });
479 let ctx = ConsumerContext::new(tx, CancellationToken::new());
480
481 consumer
482 .poll_database(&pool, &ctx, &template)
483 .await
484 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
485
486 let row = sqlx::query("select failed from jobs where id = 1")
487 .fetch_one(&pool)
488 .await
489 .expect("row 1");
490 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
491
492 let row = sqlx::query("select failed from jobs where id = 2")
493 .fetch_one(&pool)
494 .await
495 .expect("row 2");
496 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
497
498 assert_eq!(failed_1, 1);
499 assert_eq!(failed_2, 1);
500 }
501
502 #[tokio::test]
503 async fn poll_database_breaks_batch_on_consume_fail() {
504 let pool = sqlite_pool().await;
505 seed_consumer_table(&pool).await;
506
507 let mut config = SqlEndpointConfig::from_uri(
508 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&breakBatchOnConsumeFail=true&initialDelay=0&delay=1",
509 )
510 .unwrap();
511 config.resolve_defaults();
512
513 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
514 let template = parse_query_template(&config.query, config.placeholder).unwrap();
515
516 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
517 tokio::spawn(async move {
518 while let Some(env) = rx.recv().await {
519 if let Some(reply_tx) = env.reply_tx {
520 let _ =
521 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
522 }
523 }
524 });
525 let ctx = ConsumerContext::new(tx, CancellationToken::new());
526
527 let err = consumer
528 .poll_database(&pool, &ctx, &template)
529 .await
530 .expect_err("must stop on first downstream failure");
531 assert!(err.to_string().contains("downstream boom"));
532
533 let row = sqlx::query("select failed from jobs where id = 1")
534 .fetch_one(&pool)
535 .await
536 .expect("row 1");
537 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
538
539 let row = sqlx::query("select failed from jobs where id = 2")
540 .fetch_one(&pool)
541 .await
542 .expect("row 2");
543 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
544
545 assert_eq!(failed_1, 1);
546 assert_eq!(failed_2, 0, "second row must not be processed");
547 }
548
549 #[tokio::test]
555 async fn consumer_no_panic_without_prior_resolve_defaults() {
556 let config = SqlEndpointConfig::from_uri(
557 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
558 )
559 .unwrap();
560 assert!(config.max_connections.is_none());
562
563 let mut consumer = SqlConsumer::new(config, Arc::new(OnceCell::new()));
564 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
565 tokio::spawn(async move {
566 while let Some(env) = rx.recv().await {
567 if let Some(reply_tx) = env.reply_tx {
568 let _ = reply_tx.send(Ok(env.exchange));
569 }
570 }
571 });
572 let token = CancellationToken::new();
573 let ctx = ConsumerContext::new(tx, token.clone());
574
575 let consumer_handle = tokio::spawn(async move { consumer.start(ctx).await });
577
578 tokio::time::sleep(Duration::from_millis(50)).await;
580 token.cancel();
581
582 let result = consumer_handle.await.expect("task should not panic");
583 let _ = result;
585 }
586
587 #[tokio::test]
589 async fn stop_closes_pool() {
590 let pool = sqlite_pool().await;
591 seed_consumer_table(&pool).await;
592
593 let mut config = SqlEndpointConfig::from_uri(
594 "sql:select id from jobs?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&initialDelay=0&delay=1",
595 )
596 .unwrap();
597 config.resolve_defaults();
598
599 let pool_cell = Arc::new(OnceCell::new());
600 pool_cell.set(pool.clone()).unwrap();
601
602 let mut consumer = SqlConsumer::new(config, pool_cell);
603 consumer.stop().await.expect("stop should succeed");
604
605 assert!(
607 pool.is_closed(),
608 "Pool should be closed after consumer.stop()"
609 );
610 }
611
612 #[tokio::test]
614 async fn double_stop_is_safe() {
615 let pool = sqlite_pool().await;
616 let mut config = SqlEndpointConfig::from_uri(
617 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
618 )
619 .unwrap();
620 config.resolve_defaults();
621
622 let pool_cell = Arc::new(OnceCell::new());
623 pool_cell.set(pool.clone()).unwrap();
624
625 let mut consumer = SqlConsumer::new(config, pool_cell);
626 consumer.stop().await.expect("first stop should succeed");
627 consumer
628 .stop()
629 .await
630 .expect("second stop should also succeed");
631 }
632
633 #[tokio::test]
635 async fn start_after_stop_rejected() {
636 let pool = sqlite_pool().await;
637 let mut config = SqlEndpointConfig::from_uri(
638 "sql:select 1?db_url=sqlite::memory:&initialDelay=0&delay=1",
639 )
640 .unwrap();
641 config.resolve_defaults();
642
643 let pool_cell = Arc::new(OnceCell::new());
644 pool_cell.set(pool.clone()).unwrap();
645
646 let mut consumer = SqlConsumer::new(config, pool_cell);
647 consumer.stop().await.expect("stop should succeed");
648
649 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
650 tokio::spawn(async move {
651 while let Some(env) = rx.recv().await {
652 if let Some(reply_tx) = env.reply_tx {
653 let _ = reply_tx.send(Ok(env.exchange));
654 }
655 }
656 });
657 let ctx = ConsumerContext::new(tx, CancellationToken::new());
658
659 let result = consumer.start(ctx).await;
660 assert!(result.is_err());
661 let err_msg = result.unwrap_err().to_string();
662 assert!(
663 err_msg.contains("cannot be restarted") || err_msg.contains("after stop"),
664 "Expected restart error, got: {}",
665 err_msg
666 );
667 }
668
669 #[tokio::test]
671 async fn batch_mode_per_row_post_processing() {
672 let pool = sqlite_pool().await;
673 seed_consumer_table(&pool).await;
674
675 let mut config = SqlEndpointConfig::from_uri(
676 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsume=update jobs set processed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
677 )
678 .unwrap();
679 config.resolve_defaults();
680
681 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
682 let template = parse_query_template(&config.query, config.placeholder).unwrap();
683
684 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
685 tokio::spawn(async move {
686 while let Some(env) = rx.recv().await {
687 if let Some(reply_tx) = env.reply_tx {
688 let _ = reply_tx.send(Ok(env.exchange));
689 }
690 }
691 });
692 let ctx = ConsumerContext::new(tx, CancellationToken::new());
693
694 consumer
695 .poll_database(&pool, &ctx, &template)
696 .await
697 .expect("poll must succeed");
698
699 let row = sqlx::query("select processed from jobs where id = 1")
701 .fetch_one(&pool)
702 .await
703 .expect("row 1");
704 let processed_1: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
705
706 let row = sqlx::query("select processed from jobs where id = 2")
707 .fetch_one(&pool)
708 .await
709 .expect("row 2");
710 let processed_2: i64 = sqlx::Row::try_get(&row, 0).expect("processed");
711
712 assert_eq!(
713 processed_1, 1,
714 "row 1 should be marked processed via per-row onConsume"
715 );
716 assert_eq!(
717 processed_2, 1,
718 "row 2 should be marked processed via per-row onConsume"
719 );
720 }
721
722 #[tokio::test]
724 async fn batch_mode_per_row_post_processing_on_failure() {
725 let pool = sqlite_pool().await;
726 seed_consumer_table(&pool).await;
727
728 let mut config = SqlEndpointConfig::from_uri(
729 "sql:select id, processed, failed from jobs where processed = 0 order by id?db_url=sqlite::memory:&onConsumeFailed=update jobs set failed=1 where id=:#id&useIterator=false&initialDelay=0&delay=1",
730 )
731 .unwrap();
732 config.resolve_defaults();
733
734 let consumer = SqlConsumer::new(config.clone(), Arc::new(OnceCell::new()));
735 let template = parse_query_template(&config.query, config.placeholder).unwrap();
736
737 let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(8);
738 tokio::spawn(async move {
739 while let Some(env) = rx.recv().await {
740 if let Some(reply_tx) = env.reply_tx {
741 let _ =
742 reply_tx.send(Err(CamelError::ProcessorError("downstream boom".into())));
743 }
744 }
745 });
746 let ctx = ConsumerContext::new(tx, CancellationToken::new());
747
748 consumer
749 .poll_database(&pool, &ctx, &template)
750 .await
751 .expect("consumer should swallow downstream errors when breakBatchOnConsumeFail=false");
752
753 let row = sqlx::query("select failed from jobs where id = 1")
755 .fetch_one(&pool)
756 .await
757 .expect("row 1");
758 let failed_1: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
759
760 let row = sqlx::query("select failed from jobs where id = 2")
761 .fetch_one(&pool)
762 .await
763 .expect("row 2");
764 let failed_2: i64 = sqlx::Row::try_get(&row, 0).expect("failed");
765
766 assert_eq!(
767 failed_1, 1,
768 "row 1 should be marked failed via per-row onConsumeFailed"
769 );
770 assert_eq!(
771 failed_2, 1,
772 "row 2 should be marked failed via per-row onConsumeFailed"
773 );
774 }
775}