#![allow(dead_code)]
use std::time::Duration;
use umbral::db::{Alias, DatabaseRouter, RouteContext};
use umbral::migrate::ModelMeta;
#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize, umbral::orm::Model)]
#[umbral(table = "repl_note")]
pub struct RNote {
pub id: i64,
pub body: String,
}
struct ReplicaRouter;
impl DatabaseRouter for ReplicaRouter {
fn db_for_read(&self, _m: &ModelMeta, _c: &RouteContext) -> Alias {
Alias::new("replica")
}
fn db_for_write(&self, _m: &ModelMeta, _c: &RouteContext) -> Alias {
Alias::new("default")
}
}
async fn count(pool: &sqlx::PgPool) -> i64 {
sqlx::query_scalar("SELECT count(*) FROM repl_note")
.fetch_one(pool)
.await
.expect("count")
}
async fn wait_until<F>(pool: &sqlx::PgPool, mut done: F, what: &str)
where
F: FnMut(i64) -> bool,
{
for _ in 0..100 {
if done(count(pool).await) {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("timed out waiting for: {what}");
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "needs a live Postgres primary + streaming replica (UMBRAL_PRIMARY_URL / UMBRAL_REPLICA_URL)"]
async fn read_write_split_against_real_streaming_replica() {
let primary_url = std::env::var("UMBRAL_PRIMARY_URL").expect("UMBRAL_PRIMARY_URL");
let replica_url = std::env::var("UMBRAL_REPLICA_URL").expect("UMBRAL_REPLICA_URL");
let primary = sqlx::PgPool::connect(&primary_url).await.expect("primary");
let replica = sqlx::PgPool::connect(&replica_url).await.expect("replica");
let in_recovery: bool = sqlx::query_scalar("SELECT pg_is_in_recovery()")
.fetch_one(&replica)
.await
.expect("recovery check");
assert!(in_recovery, "UMBRAL_REPLICA_URL must be a read-only standby");
let _ = sqlx::query("SELECT pg_wal_replay_resume()")
.execute(&replica)
.await; sqlx::query("DROP TABLE IF EXISTS repl_note")
.execute(&primary)
.await
.ok();
sqlx::query("CREATE TABLE repl_note (id BIGSERIAL PRIMARY KEY, body TEXT NOT NULL)")
.execute(&primary)
.await
.expect("create on primary");
for _ in 0..100 {
if sqlx::query_scalar::<_, Option<String>>("SELECT to_regclass('public.repl_note')::text")
.fetch_one(&replica)
.await
.ok()
.flatten()
.is_some()
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let mut settings = umbral::Settings::from_env().expect("settings");
settings.database_url = primary_url.clone();
umbral::App::builder()
.settings(settings)
.database("default", primary.clone()) .database("replica", replica.clone()) .router(ReplicaRouter)
.model::<RNote>()
.build()
.expect("App::build");
RNote::objects()
.create(RNote {
id: 0,
body: "alpha".into(),
})
.await
.expect("create alpha");
wait_until(&replica, |n| n == 1, "alpha replicated").await;
sqlx::query("SELECT pg_wal_replay_pause()")
.execute(&replica)
.await
.expect("pause replay");
RNote::objects()
.create(RNote {
id: 0,
body: "beta".into(),
})
.await
.expect("create beta");
assert_eq!(count(&primary).await, 2, "primary has alpha+beta");
assert_eq!(
count(&replica).await,
1,
"frozen replica still has only alpha"
);
let (_row, was_created) = RNote::objects()
.get_or_create(
RNote::BODY.eq("beta"),
RNote {
id: 0,
body: "beta".into(),
},
)
.await
.expect("get_or_create");
assert!(!was_created, "read-your-writes: found beta on the primary");
assert_eq!(
count(&primary).await,
2,
"no duplicate inserted on the primary"
);
let rows = RNote::objects().fetch().await.expect("fetch from replica");
assert_eq!(
rows.len(),
1,
"umbral read the STALE replica (got {:?}), not the fresh primary",
rows.iter().map(|r| &r.body).collect::<Vec<_>>()
);
assert_eq!(rows[0].body, "alpha");
assert_eq!(RNote::objects().count().await.expect("count"), 1);
sqlx::query("SELECT pg_wal_replay_resume()")
.execute(&replica)
.await
.expect("resume replay");
wait_until(&replica, |n| n == 2, "beta replicated after resume").await;
let rows = RNote::objects()
.fetch()
.await
.expect("fetch after catch-up");
assert_eq!(rows.len(), 2, "replica caught up; umbral reads both");
sqlx::query("DROP TABLE IF EXISTS repl_note")
.execute(&primary)
.await
.ok();
println!(
"OK: write→primary; replica frozen via pg_wal_replay_pause; umbral fetch returned the \
STALE replica state {{alpha}} while the primary held {{alpha,beta}}; read-your-writes \
found beta on the primary (no duplicate); resume → umbral reads both."
);
}