1#![doc = include_str!("../README.md")]
2use std::{process::Stdio, time::Duration};
3
4use bb8_postgres::{
5 bb8::Pool,
6 tokio_postgres::{
7 tls::{MakeTlsConnect, TlsConnect},
8 Socket,
9 },
10 PostgresConnectionManager,
11};
12use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
13use rdkafka::{config::RDKafkaLogLevel, producer::FutureProducer, ClientConfig};
14use serde_json::Value;
15use tokio::{
16 io::{AsyncBufReadExt, BufReader},
17 process::Command,
18 time,
19};
20use tracing::{debug, error, info};
21
22use crate::{
23 change::{handle_event, Event, SlotChange},
24 error::Error,
25};
26
27mod change;
28pub mod error;
29
30pub async fn outbox_relay<Tls>(
31 pool: Pool<PostgresConnectionManager<Tls>>,
32 database_url: &str,
33 redpanda_host: impl Into<String>,
34 slot: &str,
35) -> Result<(), Error>
36where
37 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
38 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
39 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
40 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
41{
42 let redpanda_host = redpanda_host.into();
44 debug!(%redpanda_host, "connecting to redpanda");
45 let mut producer_config = ClientConfig::new();
46 producer_config
47 .set("bootstrap.servers", redpanda_host)
48 .set_log_level(RDKafkaLogLevel::Debug);
49 let producer: FutureProducer = producer_config.create()?;
50
51 {
52 let producer = producer.clone();
53 let pool = pool.clone();
54
55 tokio::spawn(async move {
56 let conn = pool
57 .get()
58 .await
59 .expect("could not get connection from pool");
60 let mut workers = FuturesUnordered::new();
61
62 loop {
63 workers.clear();
64 time::sleep(Duration::from_secs(3)).await;
65
66 let events = conn
67 .query(
68 r#"
69 SELECT
70 "event"."id",
71 "event"."created_at"::TEXT,
72 "event"."aggregate_type",
73 "event"."aggregate_id",
74 "event"."sequence",
75 "event"."event_data"
76 FROM "event"
77 INNER JOIN "outbox" ON "event"."id" = "outbox"."id"
78 ORDER BY "event"."id" ASC
79 LIMIT 10
80 "#,
81 &[],
82 )
83 .await
84 .expect("select outbox events");
85
86 let aggregate_events: Vec<Vec<Event>> = events
87 .into_iter()
88 .filter_map(|row| {
89 Some(Event {
90 id: row.get(0),
91 created_at: row.get(1),
92 aggregate_type: row.get(2),
93 aggregate_id: row.get(3),
94 sequence: row.get(4),
95 event: row.get::<_, Value>(5).as_object()?.clone(),
96 })
97 })
98 .fold(Vec::new(), |mut acc, event| {
99 let inner = acc.iter_mut().find(|events| {
100 events.iter().next().unwrap().aggregate_id == event.aggregate_id
101 });
102 match inner {
103 Some(inner) => {
104 inner.push(event);
105 }
106 None => acc.push(vec![event]),
107 }
108 acc
109 });
110
111 for events in aggregate_events {
112 let producer = producer.clone();
113 let pool = pool.clone();
114
115 workers.push(
116 async move {
117 for event in events {
118 let producer = producer.clone();
119 let pool = pool.clone();
120
121 if let Err(err) = handle_event(event, producer, pool).await {
122 error!(?err, "error handling event");
123 }
124 }
125 }
126 .boxed(),
127 );
128 }
129
130 while workers.next().await.is_some() {}
131 }
132 });
133 }
134
135 Command::new("pg_recvlogical")
136 .args(&[
137 "-d",
138 database_url,
139 "--slot",
140 slot,
141 "--create-slot",
142 "--if-not-exists",
143 "-P",
144 "wal2json",
145 ])
146 .stdout(Stdio::piped())
147 .spawn()?
148 .wait()
149 .await?;
150
151 let out = Command::new("pg_recvlogical")
152 .args(&["-d", database_url, "--slot", slot, "--start", "-f", "-"])
153 .stdout(Stdio::piped())
154 .spawn()?
155 .stdout
156 .ok_or_else(Error::no_stdout)?;
157
158 let mut lines = BufReader::new(out).lines();
159 info!("watching changes");
160 while let Some(line) = lines.next_line().await? {
161 match serde_json::from_str::<SlotChange>(&line) {
162 Ok(SlotChange { change: changes }) => {
163 for change in changes.into_iter().filter(|change| {
164 change.schema == "public"
165 && change.table == "event"
166 && change.kind == "insert"
167 && change.columnvalues.is_some()
168 }) {
169 let producer = producer.clone(); let pool = pool.clone(); tokio::spawn(async move {
172 let event: Event = change.columnvalues.unwrap().try_into().unwrap();
173
174 if let Err(err) = handle_event(event, producer, pool).await {
175 error!(?err, "error handling event");
176 }
177 });
178 }
179 }
180 Err(_) => {
181 error!(%line, "unrecognised json format");
182 }
183 }
184 }
185
186 Ok(())
187}