#![allow(clippy::unwrap_used, unused_crate_dependencies)]
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use engine::Engine;
use schema_core::{
Column, ColumnName, DatabaseSchema, Field, FieldName, FieldSource, FlussoType, GenericValue,
IndexName, IndexSchema, TableName,
};
use sinks_core::{Result as SinkResult, Sink};
use sources_core::SourceSpec;
use sources_postgres::{PgDocumentBuilder, ReplicationConfig, WalChangeCapture};
use sqlx::postgres::PgPoolOptions;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::ImageExt;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
#[derive(Debug)]
struct RecordingSink {
ops: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Sink for RecordingSink {
async fn upsert(
&self,
index: &IndexName,
id: &str,
_document: &GenericValue,
) -> SinkResult<()> {
self.ops
.lock()
.unwrap()
.push(format!("upsert {} {id}", index.as_ref()));
Ok(())
}
async fn delete(&self, index: &IndexName, id: &str) -> SinkResult<()> {
self.ops
.lock()
.unwrap()
.push(format!("delete {} {id}", index.as_ref()));
Ok(())
}
async fn flush(&self, _caught_up: bool) -> SinkResult<sinks_core::FlushReport> {
Ok(sinks_core::FlushReport::clean())
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "requires docker"]
async fn wal_changes_flow_through_the_engine() {
let container = Postgres::default()
.with_tag("16-alpine")
.with_cmd([
"postgres",
"-c",
"wal_level=logical",
"-c",
"max_wal_senders=10",
"-c",
"max_replication_slots=10",
])
.start()
.await
.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE users (id int PRIMARY KEY, email text)",
"CREATE PUBLICATION flusso FOR TABLE users",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
sqlx::query("SELECT pg_create_logical_replication_slot('flusso', 'pgoutput')")
.execute(&pool)
.await
.unwrap();
let replication = ReplicationConfig::new(
"127.0.0.1",
"postgres",
"postgres",
"postgres",
"flusso",
"flusso",
)
.with_port(port);
let documents = Arc::new(
PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
.await
.unwrap(),
);
let recorded = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(RecordingSink {
ops: Arc::clone(&recorded),
});
let engine = Engine::new(
Arc::new(WalChangeCapture::new(replication, url.clone())),
documents,
sink,
);
let mut engine = tokio::spawn(engine.run());
sqlx::query("INSERT INTO users (id, email) VALUES (1, 'ada@x.io')")
.execute(&pool)
.await
.unwrap();
expect_op(&mut engine, &recorded, "upsert users 1").await;
sqlx::query("DELETE FROM users WHERE id = 1")
.execute(&pool)
.await
.unwrap();
expect_op(&mut engine, &recorded, "delete users 1").await;
engine.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "requires docker"]
async fn backfill_seeds_preexisting_rows() {
let container = Postgres::default()
.with_tag("16-alpine")
.with_cmd([
"postgres",
"-c",
"wal_level=logical",
"-c",
"max_wal_senders=10",
"-c",
"max_replication_slots=10",
])
.start()
.await
.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&url).await.unwrap();
for statement in [
"CREATE TABLE users (id int PRIMARY KEY, email text)",
"CREATE PUBLICATION flusso FOR TABLE users",
"INSERT INTO users (id, email) VALUES (1, 'ada@x.io'), (2, 'grace@x.io')",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
sqlx::query("SELECT pg_create_logical_replication_slot('flusso', 'pgoutput')")
.execute(&pool)
.await
.unwrap();
let replication = ReplicationConfig::new(
"127.0.0.1",
"postgres",
"postgres",
"postgres",
"flusso",
"flusso",
)
.with_port(port);
let documents = Arc::new(
PgDocumentBuilder::connect(&url, Arc::new(users_spec()))
.await
.unwrap(),
);
let recorded = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(RecordingSink {
ops: Arc::clone(&recorded),
});
let engine = Engine::new(
Arc::new(WalChangeCapture::new(replication, url.clone())),
documents,
sink,
);
let mut engine = tokio::spawn(engine.run());
expect_op(&mut engine, &recorded, "upsert users 1").await;
expect_op(&mut engine, &recorded, "upsert users 2").await;
engine.abort();
}
async fn expect_op(
engine: &mut tokio::task::JoinHandle<engine::Result<()>>,
recorded: &Arc<Mutex<Vec<String>>>,
op: &str,
) {
tokio::select! {
result = &mut *engine => panic!("engine stopped before producing `{op}`: {result:?}"),
() = poll_for(recorded, op) => {}
}
}
async fn poll_for(recorded: &Arc<Mutex<Vec<String>>>, op: &str) {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
if recorded
.lock()
.unwrap()
.iter()
.any(|recorded_op| recorded_op == op)
{
return;
}
if Instant::now() >= deadline {
panic!(
"timed out waiting for `{op}`; recorded so far: {:?}",
recorded.lock().unwrap()
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn users_spec() -> SourceSpec {
let schema = IndexSchema {
version: 1,
table: table("users"),
db_schema: DatabaseSchema::try_new("public").unwrap(),
primary_key: Some(column("id")),
doc_id: None,
soft_delete: None,
filters: None,
fields: vec![column_field("id", "id"), column_field("email", "email")],
};
SourceSpec::new(BTreeMap::from([(index_name("users"), schema)]))
}
fn column_field(name: &str, col: &str) -> Field {
Field {
field: field(name),
options: Default::default(),
source: FieldSource::Column(Column {
column: column(col),
ty: FlussoType::Keyword,
nullable: true,
transforms: Vec::new(),
default: None,
}),
}
}
fn field(name: &str) -> FieldName {
FieldName::try_new(name).unwrap()
}
fn column(name: &str) -> ColumnName {
ColumnName::try_new(name).unwrap()
}
fn table(name: &str) -> TableName {
TableName::try_new(name).unwrap()
}
fn index_name(name: &str) -> IndexName {
IndexName::try_new(name).unwrap()
}