outbox_relay/
lib.rs

1#![doc = include_str!("../README.md")]
2use std::{process::Stdio, time::Duration};
3
4use bb8_postgres::{
5    bb8::Pool,
6    tokio_postgres::{
7        tls::{MakeTlsConnect, TlsConnect},
8        Socket,
9    },
10    PostgresConnectionManager,
11};
12use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
13use rdkafka::{config::RDKafkaLogLevel, producer::FutureProducer, ClientConfig};
14use serde_json::Value;
15use tokio::{
16    io::{AsyncBufReadExt, BufReader},
17    process::Command,
18    time,
19};
20use tracing::{debug, error, info};
21
22use crate::{
23    change::{handle_event, Event, SlotChange},
24    error::Error,
25};
26
27mod change;
28pub mod error;
29
30pub async fn outbox_relay<Tls>(
31    pool: Pool<PostgresConnectionManager<Tls>>,
32    database_url: &str,
33    redpanda_host: impl Into<String>,
34    slot: &str,
35) -> Result<(), Error>
36where
37    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
38    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
39    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
40    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
41{
42    // Create producer
43    let redpanda_host = redpanda_host.into();
44    debug!(%redpanda_host, "connecting to redpanda");
45    let mut producer_config = ClientConfig::new();
46    producer_config
47        .set("bootstrap.servers", redpanda_host)
48        .set_log_level(RDKafkaLogLevel::Debug);
49    let producer: FutureProducer = producer_config.create()?;
50
51    {
52        let producer = producer.clone();
53        let pool = pool.clone();
54
55        tokio::spawn(async move {
56            let conn = pool
57                .get()
58                .await
59                .expect("could not get connection from pool");
60            let mut workers = FuturesUnordered::new();
61
62            loop {
63                workers.clear();
64                time::sleep(Duration::from_secs(3)).await;
65
66                let events = conn
67                    .query(
68                        r#"
69                        SELECT
70                            "event"."id",
71                            "event"."created_at"::TEXT,
72                            "event"."aggregate_type",
73                            "event"."aggregate_id",
74                            "event"."sequence",
75                            "event"."event_data"
76                        FROM "event"
77                        INNER JOIN "outbox" ON "event"."id" = "outbox"."id"
78                        ORDER BY "event"."id" ASC
79                        LIMIT 10
80                        "#,
81                        &[],
82                    )
83                    .await
84                    .expect("select outbox events");
85
86                let aggregate_events: Vec<Vec<Event>> = events
87                    .into_iter()
88                    .filter_map(|row| {
89                        Some(Event {
90                            id: row.get(0),
91                            created_at: row.get(1),
92                            aggregate_type: row.get(2),
93                            aggregate_id: row.get(3),
94                            sequence: row.get(4),
95                            event: row.get::<_, Value>(5).as_object()?.clone(),
96                        })
97                    })
98                    .fold(Vec::new(), |mut acc, event| {
99                        let inner = acc.iter_mut().find(|events| {
100                            events.iter().next().unwrap().aggregate_id == event.aggregate_id
101                        });
102                        match inner {
103                            Some(inner) => {
104                                inner.push(event);
105                            }
106                            None => acc.push(vec![event]),
107                        }
108                        acc
109                    });
110
111                for events in aggregate_events {
112                    let producer = producer.clone();
113                    let pool = pool.clone();
114
115                    workers.push(
116                        async move {
117                            for event in events {
118                                let producer = producer.clone();
119                                let pool = pool.clone();
120
121                                if let Err(err) = handle_event(event, producer, pool).await {
122                                    error!(?err, "error handling event");
123                                }
124                            }
125                        }
126                        .boxed(),
127                    );
128                }
129
130                while workers.next().await.is_some() {}
131            }
132        });
133    }
134
135    Command::new("pg_recvlogical")
136        .args(&[
137            "-d",
138            database_url,
139            "--slot",
140            slot,
141            "--create-slot",
142            "--if-not-exists",
143            "-P",
144            "wal2json",
145        ])
146        .stdout(Stdio::piped())
147        .spawn()?
148        .wait()
149        .await?;
150
151    let out = Command::new("pg_recvlogical")
152        .args(&["-d", database_url, "--slot", slot, "--start", "-f", "-"])
153        .stdout(Stdio::piped())
154        .spawn()?
155        .stdout
156        .ok_or_else(Error::no_stdout)?;
157
158    let mut lines = BufReader::new(out).lines();
159    info!("watching changes");
160    while let Some(line) = lines.next_line().await? {
161        match serde_json::from_str::<SlotChange>(&line) {
162            Ok(SlotChange { change: changes }) => {
163                for change in changes.into_iter().filter(|change| {
164                    change.schema == "public"
165                        && change.table == "event"
166                        && change.kind == "insert"
167                        && change.columnvalues.is_some()
168                }) {
169                    let producer = producer.clone(); // Fields are wrapped in Arc
170                    let pool = pool.clone(); // Fields are wrapped in Arc
171                    tokio::spawn(async move {
172                        let event: Event = change.columnvalues.unwrap().try_into().unwrap();
173
174                        if let Err(err) = handle_event(event, producer, pool).await {
175                            error!(?err, "error handling event");
176                        }
177                    });
178                }
179            }
180            Err(_) => {
181                error!(%line, "unrecognised json format");
182            }
183        }
184    }
185
186    Ok(())
187}