1use crate::config::PostgresCdcSourceConfig;
4use crate::pgoutput::decoder::decode_message;
5use crate::pgoutput::messages::{
6 Delete, Insert, Message, Relation, Truncate, TupleCell, TupleData, Update,
7};
8use crate::pgoutput::registry::RelationRegistry;
9use crate::pgoutput::values::text_to_json;
10use crate::replication::{
11 self, ReplicationEvent, ReplicationParams, postgres_clock_to_unix_ms, recv, send_status_update,
12};
13use crate::state::{Bookmark, format_lsn, parse_lsn, state_key};
14use async_trait::async_trait;
15use faucet_core::{FaucetError, Source, Stream, StreamPage};
16use serde_json::{Map, Value, json};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::time::{Duration, Instant};
20use tokio::sync::Mutex;
21
22pub struct PostgresCdcSource {
23 config: PostgresCdcSourceConfig,
24 state_key_value: String,
25 pending_bookmark: Mutex<Option<Bookmark>>,
29 confirmed_lsn: Mutex<u64>,
39}
40
41impl PostgresCdcSource {
42 pub async fn new(config: PostgresCdcSourceConfig) -> Result<Self, FaucetError> {
43 config.validate()?;
44 let key = state_key(&config.slot_name);
45 let initial_lsn = match config.start_lsn.as_deref() {
46 Some(s) => parse_lsn(s)?,
47 None => 0,
48 };
49 Ok(Self {
50 config,
51 state_key_value: key,
52 pending_bookmark: Mutex::new(None),
53 confirmed_lsn: Mutex::new(initial_lsn),
54 })
55 }
56
57 pub async fn drop_slot(&self) -> Result<(), FaucetError> {
62 replication::drop_slot(&self.config.connection_url, &self.config.slot_name).await
63 }
64}
65
66#[async_trait]
67impl Source for PostgresCdcSource {
68 async fn fetch_with_context(
69 &self,
70 ctx: &HashMap<String, Value>,
71 ) -> Result<Vec<Value>, FaucetError> {
72 let (records, _bookmark) = self.fetch_with_context_incremental(ctx).await?;
73 Ok(records)
74 }
75
76 async fn fetch_with_context_incremental(
88 &self,
89 ctx: &HashMap<String, Value>,
90 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
91 use futures::StreamExt;
92 let mut pages = self.stream_pages_with_batch_size(ctx, 0);
93 let mut all: Vec<Value> = Vec::new();
94 let mut bookmark: Option<Value> = None;
95 while let Some(page) = pages.next().await {
96 let page = page?;
97 all.extend(page.records);
98 if page.bookmark.is_some() {
99 bookmark = page.bookmark;
100 }
101 }
102 Ok((all, bookmark))
103 }
104
105 fn stream_pages<'a>(
128 &'a self,
129 ctx: &'a HashMap<String, Value>,
130 _batch_size: usize,
131 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
132 self.stream_pages_with_batch_size(ctx, self.config.batch_size)
133 }
134
135 fn config_schema(&self) -> Value {
136 let schema = schemars::schema_for!(PostgresCdcSourceConfig);
137 serde_json::to_value(&schema).unwrap_or(Value::Null)
138 }
139
140 fn state_key(&self) -> Option<String> {
141 Some(self.state_key_value.clone())
142 }
143
144 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
145 let parsed = Bookmark::from_value(bookmark)?;
146 *self.confirmed_lsn.lock().await = parsed.as_u64()?;
149 *self.pending_bookmark.lock().await = Some(parsed);
150 Ok(())
151 }
152
153 fn connector_name(&self) -> &'static str {
154 "postgres-cdc"
155 }
156
157 async fn check(
173 &self,
174 ctx: &faucet_core::check::CheckContext,
175 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
176 use faucet_core::check::{CheckReport, Probe};
177 use sqlx::ConnectOptions as _;
178 use sqlx::postgres::{PgConnectOptions, PgConnection};
179
180 let start = std::time::Instant::now();
181
182 let opts: PgConnectOptions = match self.config.connection_url.parse() {
184 Ok(o) => o,
185 Err(e) => {
186 return Ok(CheckReport::single(Probe::fail_hint(
187 "auth",
188 start.elapsed(),
189 format!("invalid connection URL: {e}"),
190 "connection_url must be a valid postgres:// URL",
191 )));
192 }
193 };
194
195 let probe = async {
198 let mut conn: PgConnection = opts.connect().await.map_err(|e| {
199 Probe::fail_hint(
200 "auth",
201 start.elapsed(),
202 format!("could not connect: {e}"),
203 "verify the host is reachable and credentials are valid",
204 )
205 })?;
206
207 let row: Option<(String,)> = sqlx::query_as(
208 "SELECT slot_name::text FROM pg_replication_slots WHERE slot_name = $1",
209 )
210 .bind(&self.config.slot_name)
211 .fetch_optional(&mut conn)
212 .await
213 .map_err(|e| {
214 Probe::fail(
215 "slot",
216 start.elapsed(),
217 format!("could not query pg_replication_slots: {e}"),
218 )
219 })?;
220
221 Ok::<Probe, Probe>(match row {
222 Some(_) => Probe::pass("slot", start.elapsed()),
223 None => Probe::skip(
224 "slot",
225 format!(
226 "replication slot {} does not exist yet (faucet run can create it)",
227 self.config.slot_name
228 ),
229 ),
230 })
231 };
232
233 let probe = match tokio::time::timeout(ctx.timeout, probe).await {
234 Ok(Ok(p)) | Ok(Err(p)) => p,
235 Err(_elapsed) => Probe::fail_hint(
236 "auth",
237 start.elapsed(),
238 "connection timed out",
239 "the database did not respond within the check timeout",
240 ),
241 };
242 Ok(CheckReport::single(probe))
243 }
244}
245
246impl PostgresCdcSource {
247 fn stream_pages_with_batch_size<'a>(
258 &'a self,
259 _ctx: &'a HashMap<String, Value>,
260 batch_size: usize,
261 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
262 let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
263 let idle_timeout = self.config.idle_timeout;
264 let per_transaction = batch_size != 0;
265
266 Box::pin(async_stream::try_stream! {
267 let pending = {
269 let mut g = self.pending_bookmark.lock().await;
270 g.take()
271 };
272 let start_lsn = if let Some(b) = pending.as_ref() {
273 let lsn = b.as_u64()?;
274 *self.confirmed_lsn.lock().await = lsn;
275 Some(lsn)
276 } else {
277 self.config
278 .start_lsn
279 .as_deref()
280 .map(parse_lsn)
281 .transpose()?
282 };
283
284 let params = ReplicationParams {
286 connection_url: &self.config.connection_url,
287 slot_name: &self.config.slot_name,
288 publication_name: &self.config.publication_name,
289 proto_version: self.config.proto_version,
290 create_slot_if_missing: self.config.create_slot_if_missing,
291 start_lsn,
292 status_update_interval: self.config.status_update_interval,
293 tcp_keepalive: self.config.tcp_keepalive,
294 slot_type: self.config.slot_type,
295 tls: &self.config.tls,
296 };
297 let client = replication::connect(¶ms).await?;
298 replication::ensure_slot(
299 &client,
300 &self.config.connection_url,
301 &self.config.slot_name,
302 self.config.create_slot_if_missing,
303 self.config.slot_type,
304 )
305 .await?;
306
307 if let Some(lsn) = start_lsn {
318 replication::retry_on_slot_active(self.config.slot_acquire_retries, || {
323 replication::advance_slot(
324 &self.config.connection_url,
325 &self.config.slot_name,
326 lsn,
327 )
328 })
329 .await?;
330 }
331
332 let mut duplex = replication::retry_on_slot_active(
333 self.config.slot_acquire_retries,
334 || replication::start_replication(&client, ¶ms),
335 )
336 .await?;
337
338 let initial_confirmed = *self.confirmed_lsn.lock().await;
341 send_status_update(&mut duplex, initial_confirmed, false).await?;
342
343 let mut registry = RelationRegistry::new();
350 let mut state = TxnState {
351 max_staged_records: self.config.max_staged_records,
352 ..TxnState::default()
353 };
354 let mut agg_records: Vec<Value> = Vec::new();
355 let mut total_records: usize = 0;
356 let mut last_message_at = Instant::now();
357
358 loop {
359 let idle_deadline = last_message_at + idle_timeout;
360 let budget = idle_deadline
361 .checked_duration_since(Instant::now())
362 .unwrap_or(Duration::ZERO);
363
364 let mut stop = false;
368 let mut just_committed: Option<(u64, Vec<Value>)> = None;
369 let mut fatal: Option<FaucetError> = None;
370 let mut unexpected_end = false;
371 tokio::select! {
372 biased;
373 _ = tokio::signal::ctrl_c() => {
374 tracing::info!("postgres-cdc: ctrl_c received, stopping cleanly");
375 stop = true;
376 }
377 ev = tokio::time::timeout(budget, recv(&mut duplex)) => {
378 match ev {
379 Ok(Ok(Some(event))) => {
380 last_message_at = Instant::now();
384 let was_in_txn = state.in_txn;
385 let pre_commit_count = state.last_committed;
386 let mut committed_records: Vec<Value> = Vec::new();
387 if let Err(e) = handle_event(
388 event,
389 &mut registry,
390 &mut state,
391 &mut committed_records,
392 ) {
393 fatal = Some(e);
394 } else if was_in_txn
395 && !state.in_txn
396 && state.last_committed != pre_commit_count
397 {
398 let lsn = state.last_committed
402 .expect("last_committed set on commit");
403 total_records += committed_records.len();
404 just_committed = Some((lsn, committed_records));
405 }
406 }
407 Ok(Ok(None)) => {
408 unexpected_end = true;
409 }
410 Ok(Err(e)) => {
411 fatal = Some(e);
412 }
413 Err(_timeout) => {
414 tracing::debug!(
415 "postgres-cdc: idle_timeout reached, stopping"
416 );
417 stop = true;
418 }
419 }
420 }
421 }
422
423 if let Some(e) = fatal {
424 Err(e)?;
425 }
426 if unexpected_end {
427 Err(FaucetError::Source(
428 "postgres-cdc: replication stream ended unexpectedly".into(),
429 ))?;
430 }
431 if let Some((lsn, drained)) = just_committed {
432 if per_transaction {
443 let bookmark = Some(Bookmark::from_u64(lsn).to_value()?);
444 yield StreamPage {
445 records: drained,
446 bookmark,
447 };
448 } else {
449 agg_records.extend(drained);
450 }
451 if total_records >= max_messages {
452 stop = true;
453 }
454 }
455
456 if stop {
457 break;
458 }
459 }
460
461 if !per_transaction
467 && let Some(lsn) = state.last_committed
468 {
469 let bookmark = Some(Bookmark::from_u64(lsn).to_value()?);
470 yield StreamPage {
471 records: agg_records,
472 bookmark,
473 };
474 }
475
476 tracing::info!(
477 records = total_records,
478 batch_size,
479 "postgres-cdc: stream complete",
480 );
481 })
482 }
483}
484
485struct TupleRow {
488 values: Map<String, Value>,
489 unchanged_toast: Vec<String>,
490}
491
492#[derive(Default)]
494struct TxnState {
495 staged: Vec<Value>,
500 last_committed: Option<u64>,
502 in_progress_ts: i64,
505 in_progress_lsn: u64,
507 in_txn: bool,
509 max_staged_records: Option<usize>,
514}
515
516impl TxnState {
517 fn push_staged(&mut self, record: Value) -> Result<(), FaucetError> {
520 if let Some(max) = self.max_staged_records
521 && self.staged.len() >= max
522 {
523 return Err(FaucetError::Source(format!(
524 "postgres-cdc: in-progress transaction exceeded max_staged_records ({max}); \
525 aborting to avoid unbounded memory growth. Raise max_staged_records or \
526 reduce the size of the source transaction."
527 )));
528 }
529 self.staged.push(record);
530 Ok(())
531 }
532}
533
534fn handle_event(
535 event: ReplicationEvent,
536 registry: &mut RelationRegistry,
537 state: &mut TxnState,
538 out: &mut Vec<Value>,
539) -> Result<(), FaucetError> {
540 match event {
541 ReplicationEvent::Begin {
542 final_lsn,
543 commit_time_micros,
544 xid: _,
545 } => {
546 if state.in_txn {
547 return Err(FaucetError::Source(format!(
552 "postgres-cdc: BEGIN received while a previous transaction was still \
553 in progress ({} records staged) — replication stream desync",
554 state.staged.len()
555 )));
556 }
557 state.in_txn = true;
558 state.in_progress_lsn = final_lsn.as_u64();
559 state.in_progress_ts = commit_time_micros;
560 state.staged.clear();
561 }
562 ReplicationEvent::Commit {
563 lsn: _,
564 commit_time_micros: _,
565 end_lsn,
566 } => {
567 if !state.in_txn {
568 return Err(FaucetError::Source(
569 "postgres-cdc: COMMIT without BEGIN".into(),
570 ));
571 }
572 out.append(&mut state.staged);
590 state.last_committed = Some(end_lsn.as_u64());
591 state.in_txn = false;
592 }
593 ReplicationEvent::XLogData { data, .. } => {
594 let msg = decode_message(&data)?;
595 handle_pgoutput(msg, registry, state)?;
596 }
597 ReplicationEvent::Message { .. } => {
598 }
601 other => {
605 return Err(FaucetError::Source(format!(
606 "postgres-cdc: unhandled ReplicationEvent variant {other:?} — refusing to \
607 continue rather than risk silently dropping change data"
608 )));
609 }
610 }
611 Ok(())
612}
613
614fn handle_pgoutput(
615 msg: Message,
616 registry: &mut RelationRegistry,
617 state: &mut TxnState,
618) -> Result<(), FaucetError> {
619 match msg {
620 Message::Relation(r) => registry.insert(r),
621 Message::Origin | Message::Type => {} Message::Insert(i) => stage_insert(state, registry, i)?,
623 Message::Update(u) => stage_update(state, registry, u)?,
624 Message::Delete(d) => stage_delete(state, registry, d)?,
625 Message::Truncate(t) => stage_truncate(state, registry, t)?,
626 Message::Begin(_) | Message::Commit(_) => {
631 tracing::warn!(
632 "postgres-cdc: pgoutput Begin/Commit reached pgoutput decoder; \
633 pgwire-replication should have intercepted it"
634 );
635 }
636 }
637 Ok(())
638}
639
640fn stage_insert(
641 state: &mut TxnState,
642 registry: &RelationRegistry,
643 i: Insert,
644) -> Result<(), FaucetError> {
645 let rel = registry.get(i.relation_oid)?;
646 let after = tuple_to_object(rel, &i.new)?;
647 let r = record(rel, "insert", state, None, Some(after));
648 state.push_staged(r)
649}
650
651fn stage_update(
652 state: &mut TxnState,
653 registry: &RelationRegistry,
654 u: Update,
655) -> Result<(), FaucetError> {
656 let rel = registry.get(u.relation_oid)?;
657 let before = match &u.old {
658 Some(t) => Some(tuple_to_object(rel, t)?),
659 None => None,
660 };
661 let after = tuple_to_object(rel, &u.new)?;
662 let r = record(rel, "update", state, before, Some(after));
663 state.push_staged(r)
664}
665
666fn stage_delete(
667 state: &mut TxnState,
668 registry: &RelationRegistry,
669 d: Delete,
670) -> Result<(), FaucetError> {
671 let rel = registry.get(d.relation_oid)?;
672 let before = Some(tuple_to_object(rel, &d.old)?);
673 let r = record(rel, "delete", state, before, None);
674 state.push_staged(r)
675}
676
677fn stage_truncate(
678 state: &mut TxnState,
679 registry: &RelationRegistry,
680 t: Truncate,
681) -> Result<(), FaucetError> {
682 for oid in &t.relation_oids {
683 let rel = registry.get(*oid)?;
684 let r = record(rel, "truncate", state, None, None);
685 state.push_staged(r)?;
686 }
687 Ok(())
688}
689
690fn record(
691 rel: &Relation,
692 op: &str,
693 state: &TxnState,
694 before: Option<TupleRow>,
695 after: Option<TupleRow>,
696) -> Value {
697 fn to_value(row: TupleRow) -> Value {
698 let mut o = row.values;
699 if !row.unchanged_toast.is_empty() {
700 o.insert("__unchanged_toast__".into(), json!(row.unchanged_toast));
701 }
702 Value::Object(o)
703 }
704 let mut obj = Map::new();
705 obj.insert("op".into(), json!(op));
706 obj.insert("schema".into(), json!(rel.namespace));
707 obj.insert("table".into(), json!(rel.name));
708 obj.insert("lsn".into(), json!(format_lsn(state.in_progress_lsn)));
709 obj.insert(
710 "ts_ms".into(),
711 json!(postgres_clock_to_unix_ms(state.in_progress_ts)),
712 );
713 obj.insert("before".into(), before.map(to_value).unwrap_or(Value::Null));
714 obj.insert("after".into(), after.map(to_value).unwrap_or(Value::Null));
715 Value::Object(obj)
716}
717
718fn tuple_to_object(rel: &Relation, tup: &TupleData) -> Result<TupleRow, FaucetError> {
720 if tup.cells.len() != rel.columns.len() {
721 return Err(FaucetError::Source(format!(
722 "postgres-cdc: tuple has {} cells but relation {}.{} has {} columns",
723 tup.cells.len(),
724 rel.namespace,
725 rel.name,
726 rel.columns.len()
727 )));
728 }
729 let mut values = Map::with_capacity(rel.columns.len());
730 let mut unchanged_toast = Vec::new();
731 for (col, cell) in rel.columns.iter().zip(&tup.cells) {
732 match cell {
733 TupleCell::Null => {
734 values.insert(col.name.clone(), Value::Null);
735 }
736 TupleCell::UnchangedToast => {
737 unchanged_toast.push(col.name.clone());
738 }
739 TupleCell::Text(s) => {
740 values.insert(col.name.clone(), text_to_json(col.type_oid, s)?);
741 }
742 }
743 }
744 Ok(TupleRow {
745 values,
746 unchanged_toast,
747 })
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use crate::pgoutput::messages::{ColumnDesc, ReplicaIdentity};
754 use crate::replication::ReplicationEvent;
755 use pgwire_replication::Lsn;
756
757 fn rel_users() -> Relation {
758 Relation {
759 oid: 16384,
760 namespace: "public".into(),
761 name: "users".into(),
762 replica_identity: ReplicaIdentity::Default,
763 columns: vec![
764 ColumnDesc {
765 flags: 1,
766 name: "id".into(),
767 type_oid: 23,
768 type_modifier: -1,
769 },
770 ColumnDesc {
771 flags: 0,
772 name: "name".into(),
773 type_oid: 25,
774 type_modifier: -1,
775 },
776 ],
777 }
778 }
779
780 fn xlogdata(payload: Vec<u8>) -> ReplicationEvent {
781 ReplicationEvent::XLogData {
782 wal_start: Lsn::from_u64(0),
783 wal_end: Lsn::from_u64(0x16A_4F88),
784 server_time_micros: 0,
785 data: bytes::Bytes::from(payload),
786 }
787 }
788
789 fn insert_payload(relation_oid: u32, cells: &[(&str, &str)]) -> Vec<u8> {
790 let mut buf: Vec<u8> = Vec::new();
791 buf.push(b'I');
792 buf.extend_from_slice(&relation_oid.to_be_bytes());
793 buf.push(b'N');
794 buf.extend_from_slice(&(cells.len() as u16).to_be_bytes());
795 for (_, val) in cells {
796 text_cell(&mut buf, val);
797 }
798 buf
799 }
800
801 fn update_full_payload(
803 relation_oid: u32,
804 old_cells: &[(&str, &str)],
805 new_cells: &[(&str, &str)],
806 ) -> Vec<u8> {
807 let mut buf: Vec<u8> = Vec::new();
808 buf.push(b'U');
809 buf.extend_from_slice(&relation_oid.to_be_bytes());
810 buf.push(b'O');
811 buf.extend_from_slice(&(old_cells.len() as u16).to_be_bytes());
812 for (_, val) in old_cells {
813 text_cell(&mut buf, val);
814 }
815 buf.push(b'N');
816 buf.extend_from_slice(&(new_cells.len() as u16).to_be_bytes());
817 for (_, val) in new_cells {
818 text_cell(&mut buf, val);
819 }
820 buf
821 }
822
823 fn delete_full_payload(relation_oid: u32, old_cells: &[(&str, &str)]) -> Vec<u8> {
825 let mut buf: Vec<u8> = Vec::new();
826 buf.push(b'D');
827 buf.extend_from_slice(&relation_oid.to_be_bytes());
828 buf.push(b'O');
829 buf.extend_from_slice(&(old_cells.len() as u16).to_be_bytes());
830 for (_, val) in old_cells {
831 text_cell(&mut buf, val);
832 }
833 buf
834 }
835
836 fn truncate_payload(relation_oids: &[u32]) -> Vec<u8> {
838 let mut buf: Vec<u8> = Vec::new();
839 buf.push(b'T');
840 buf.extend_from_slice(&(relation_oids.len() as u32).to_be_bytes());
841 buf.push(0u8); for oid in relation_oids {
843 buf.extend_from_slice(&oid.to_be_bytes());
844 }
845 buf
846 }
847
848 fn text_cell(buf: &mut Vec<u8>, val: &str) {
849 buf.push(b't');
850 buf.extend_from_slice(&(val.len() as u32).to_be_bytes());
851 buf.extend_from_slice(val.as_bytes());
852 }
853
854 fn begin_event(final_lsn: u64) -> ReplicationEvent {
855 ReplicationEvent::Begin {
856 final_lsn: Lsn::from_u64(final_lsn),
857 xid: 1,
858 commit_time_micros: 0,
859 }
860 }
861
862 fn commit_event(lsn: u64) -> ReplicationEvent {
863 ReplicationEvent::Commit {
864 lsn: Lsn::from_u64(lsn),
865 end_lsn: Lsn::from_u64(lsn + 0x10),
866 commit_time_micros: 0,
867 }
868 }
869
870 #[test]
871 fn full_transaction_promotes_to_output_on_commit() {
872 let mut registry = RelationRegistry::new();
873 registry.insert(rel_users());
874 let mut state = TxnState::default();
875 let mut out = vec![];
876
877 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
878 assert!(out.is_empty());
879
880 handle_event(
881 xlogdata(insert_payload(16384, &[("id", "1"), ("name", "alice")])),
882 &mut registry,
883 &mut state,
884 &mut out,
885 )
886 .unwrap();
887 assert!(out.is_empty(), "records stay staged until COMMIT");
888
889 handle_event(
890 commit_event(0x16A_4F88),
891 &mut registry,
892 &mut state,
893 &mut out,
894 )
895 .unwrap();
896
897 assert_eq!(out.len(), 1);
898 assert_eq!(out[0]["op"], "insert");
899 assert_eq!(out[0]["schema"], "public");
900 assert_eq!(out[0]["table"], "users");
901 assert_eq!(out[0]["lsn"], "0/16A4F88");
902 assert_eq!(out[0]["after"]["id"], 1);
903 assert_eq!(out[0]["after"]["name"], "alice");
904 assert_eq!(out[0]["before"], Value::Null);
905
906 assert_eq!(state.last_committed, Some(0x16A_4F88 + 0x10));
911 }
912
913 #[test]
914 fn staging_beyond_max_staged_records_aborts() {
915 let mut registry = RelationRegistry::new();
919 registry.insert(rel_users());
920 let mut state = TxnState {
921 max_staged_records: Some(2),
922 ..TxnState::default()
923 };
924 let mut out = vec![];
925
926 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
927 for id in ["1", "2"] {
929 handle_event(
930 xlogdata(insert_payload(16384, &[("id", id), ("name", "x")])),
931 &mut registry,
932 &mut state,
933 &mut out,
934 )
935 .unwrap();
936 }
937 let err = handle_event(
939 xlogdata(insert_payload(16384, &[("id", "3"), ("name", "x")])),
940 &mut registry,
941 &mut state,
942 &mut out,
943 )
944 .unwrap_err();
945 assert!(
946 format!("{err}").contains("max_staged_records"),
947 "error must name the cap: {err}"
948 );
949 assert!(matches!(err, FaucetError::Source(_)));
950 }
951
952 #[test]
953 fn no_cap_allows_large_transactions() {
954 let mut registry = RelationRegistry::new();
957 registry.insert(rel_users());
958 let mut state = TxnState::default();
959 let mut out = vec![];
960
961 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
962 for id in 0..50 {
963 handle_event(
964 xlogdata(insert_payload(
965 16384,
966 &[("id", &id.to_string()), ("name", "x")],
967 )),
968 &mut registry,
969 &mut state,
970 &mut out,
971 )
972 .unwrap();
973 }
974 handle_event(
975 commit_event(0x16A_4F88),
976 &mut registry,
977 &mut state,
978 &mut out,
979 )
980 .unwrap();
981 assert_eq!(out.len(), 50);
982 }
983
984 #[test]
985 fn commit_without_begin_errors() {
986 let mut registry = RelationRegistry::new();
987 let mut state = TxnState::default();
988 let mut out = vec![];
989
990 let err = handle_event(
991 ReplicationEvent::Commit {
992 lsn: Lsn::from_u64(1),
993 end_lsn: Lsn::from_u64(2),
994 commit_time_micros: 0,
995 },
996 &mut registry,
997 &mut state,
998 &mut out,
999 )
1000 .unwrap_err();
1001 assert!(format!("{err}").contains("COMMIT without BEGIN"));
1002 }
1003
1004 #[test]
1005 fn double_begin_errors() {
1006 let mut registry = RelationRegistry::new();
1009 registry.insert(rel_users());
1010 let mut state = TxnState::default();
1011 let mut out = vec![];
1012
1013 handle_event(begin_event(0x100), &mut registry, &mut state, &mut out).unwrap();
1014 handle_event(
1015 xlogdata(insert_payload(16384, &[("id", "1"), ("name", "alice")])),
1016 &mut registry,
1017 &mut state,
1018 &mut out,
1019 )
1020 .unwrap();
1021
1022 let err =
1023 handle_event(begin_event(0x200), &mut registry, &mut state, &mut out).unwrap_err();
1024 assert!(format!("{err}").contains("desync"), "{err}");
1025 }
1026
1027 #[test]
1028 fn unknown_relation_in_insert_errors() {
1029 let mut registry = RelationRegistry::new();
1030 let mut state = TxnState::default();
1031 let mut out = vec![];
1032
1033 handle_event(begin_event(1), &mut registry, &mut state, &mut out).unwrap();
1034 let err = handle_event(
1036 xlogdata(insert_payload(99999, &[("id", "1"), ("name", "alice")])),
1037 &mut registry,
1038 &mut state,
1039 &mut out,
1040 )
1041 .unwrap_err();
1042 assert!(format!("{err}").contains("99999"));
1043 }
1044
1045 #[test]
1046 fn update_with_replica_identity_full_emits_before_and_after() {
1047 let mut registry = RelationRegistry::new();
1048 registry.insert(rel_users());
1049 let mut state = TxnState::default();
1050 let mut out = vec![];
1051
1052 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1053 handle_event(
1054 xlogdata(update_full_payload(
1055 16384,
1056 &[("id", "1"), ("name", "alice")],
1057 &[("id", "1"), ("name", "alice2")],
1058 )),
1059 &mut registry,
1060 &mut state,
1061 &mut out,
1062 )
1063 .unwrap();
1064 handle_event(
1065 commit_event(0x16A_4F88),
1066 &mut registry,
1067 &mut state,
1068 &mut out,
1069 )
1070 .unwrap();
1071
1072 assert_eq!(out.len(), 1);
1073 assert_eq!(out[0]["op"], "update");
1074 assert_eq!(out[0]["before"]["id"], 1);
1075 assert_eq!(out[0]["before"]["name"], "alice");
1076 assert_eq!(out[0]["after"]["name"], "alice2");
1077 }
1078
1079 #[test]
1080 fn delete_with_replica_identity_full_emits_before_only() {
1081 let mut registry = RelationRegistry::new();
1082 registry.insert(rel_users());
1083 let mut state = TxnState::default();
1084 let mut out = vec![];
1085
1086 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1087 handle_event(
1088 xlogdata(delete_full_payload(
1089 16384,
1090 &[("id", "1"), ("name", "alice")],
1091 )),
1092 &mut registry,
1093 &mut state,
1094 &mut out,
1095 )
1096 .unwrap();
1097 handle_event(
1098 commit_event(0x16A_4F88),
1099 &mut registry,
1100 &mut state,
1101 &mut out,
1102 )
1103 .unwrap();
1104
1105 assert_eq!(out.len(), 1);
1106 assert_eq!(out[0]["op"], "delete");
1107 assert_eq!(out[0]["before"]["id"], 1);
1108 assert_eq!(out[0]["before"]["name"], "alice");
1109 assert_eq!(out[0]["after"], Value::Null);
1110 }
1111
1112 #[test]
1113 fn truncate_emits_one_record_per_relation() {
1114 let mut registry = RelationRegistry::new();
1115 registry.insert(rel_users());
1116 let mut second = rel_users();
1118 second.oid = 16385;
1119 second.name = "orders".into();
1120 registry.insert(second);
1121
1122 let mut state = TxnState::default();
1123 let mut out = vec![];
1124
1125 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1126 handle_event(
1127 xlogdata(truncate_payload(&[16384, 16385])),
1128 &mut registry,
1129 &mut state,
1130 &mut out,
1131 )
1132 .unwrap();
1133 handle_event(
1134 commit_event(0x16A_4F88),
1135 &mut registry,
1136 &mut state,
1137 &mut out,
1138 )
1139 .unwrap();
1140
1141 assert_eq!(out.len(), 2);
1142 assert!(out.iter().all(|r| r["op"] == "truncate"));
1143 let tables: Vec<_> = out.iter().map(|r| r["table"].as_str().unwrap()).collect();
1144 assert!(tables.contains(&"users"));
1145 assert!(tables.contains(&"orders"));
1146 }
1147
1148 #[test]
1149 fn unchanged_toast_in_before_surfaces_via_metadata() {
1150 let mut registry = RelationRegistry::new();
1154 registry.insert(rel_users());
1155 let mut state = TxnState::default();
1156 let mut out = vec![];
1157
1158 handle_event(begin_event(0x16A_4F88), &mut registry, &mut state, &mut out).unwrap();
1159 let mut buf: Vec<u8> = Vec::new();
1161 buf.push(b'U');
1162 buf.extend_from_slice(&16384u32.to_be_bytes());
1163 buf.push(b'O');
1164 buf.extend_from_slice(&2u16.to_be_bytes());
1165 text_cell(&mut buf, "1");
1167 buf.push(b'u');
1169 buf.push(b'N');
1171 buf.extend_from_slice(&2u16.to_be_bytes());
1172 text_cell(&mut buf, "1");
1173 text_cell(&mut buf, "alice2");
1174 handle_event(xlogdata(buf), &mut registry, &mut state, &mut out).unwrap();
1175 handle_event(
1176 commit_event(0x16A_4F88),
1177 &mut registry,
1178 &mut state,
1179 &mut out,
1180 )
1181 .unwrap();
1182
1183 assert_eq!(out.len(), 1);
1184 assert_eq!(out[0]["before"]["__unchanged_toast__"], json!(["name"]));
1185 assert!(out[0]["before"].get("name").is_none());
1186 assert_eq!(out[0]["before"]["id"], 1);
1187 assert_eq!(out[0]["after"]["name"], "alice2");
1188 }
1189}