use pgrx::bgworkers::{BackgroundWorker, BackgroundWorkerBuilder, BgWorkerStartTime, SignalWakeFlags};
use pgrx::prelude::*;
use std::collections::HashMap;
use std::time::{Duration, Instant};
mod apply;
mod config;
mod failover;
mod raft_node;
mod rpc;
mod rtype;
mod state;
mod store;
pgrx::pg_module_magic!();
#[pg_guard]
pub extern "C-unwind" fn _PG_init() {
if !unsafe { pg_sys::process_shared_preload_libraries_in_progress } {
return;
}
config::init();
BackgroundWorkerBuilder::new("pg_replica supervisor")
.set_function("pg_replica_supervisor_main")
.set_library("pg_replica")
.set_restart_time(Some(Duration::from_secs(5)))
.set_start_time(BgWorkerStartTime::ConsistentState)
.load();
}
#[pg_guard]
#[unsafe(no_mangle)]
pub extern "C-unwind" fn pg_replica_supervisor_main(_arg: pg_sys::Datum) {
BackgroundWorker::attach_signal_handlers(SignalWakeFlags::SIGHUP | SignalWakeFlags::SIGTERM);
let auth_passfile = config::passfile();
if !auth_passfile.is_empty() {
std::env::set_var("PGPASSFILE", &auth_passfile);
}
let node_id = config::node_id() as u64;
let port = config::raft_port() as u16;
let peers = rpc::parse_peers(&config::peers());
let voters: Vec<u64> = peers.iter().map(|peer| peer.id).collect();
pgrx::log!(
"pg_replica: supervisor started (node_id={}, raft_port={}, voters={:?})",
node_id,
port,
voters
);
if node_id == 0 || voters.is_empty() {
pgrx::log!("pg_replica: not configured (node_id/peers unset); idling");
while BackgroundWorker::wait_latch(Some(Duration::from_secs(10))) {}
return;
}
let pg_members = rpc::parse_peers(&config::pg_addrs());
let psql = config::psql();
let pgbin = apply::parent_dir(&psql);
let rejoin_script = config::rejoin_script();
let passfile = config::passfile();
let (my_host, my_port) = apply::split_host_port(
&pg_members
.iter()
.find(|member| member.id == node_id)
.map(|member| member.addr.clone())
.unwrap_or_default(),
);
let majority = voters.len() / 2 + 1;
let sync_quorum = majority.saturating_sub(1);
let sync_target = if sync_quorum >= 1 {
let names: Vec<String> = voters
.iter()
.filter(|&&voter| voter != node_id)
.map(|voter| format!("node{}", voter))
.collect();
format!("ANY {} ({})", sync_quorum, names.join(", "))
} else {
String::new()
};
let synchronous = config::synchronous();
let raft_dir = config::raft_dir();
let heartbeat_file = format!("{}/pg_replica_hb_{}", raft_dir, node_id);
let watchdog_script = config::watchdog_script();
if !watchdog_script.is_empty() {
if apply::spawn_watchdog(&watchdog_script, &psql, &my_host, &my_port, &heartbeat_file, node_id)
{
pgrx::log!("pg_replica: node {} deadman watchdog spawned", node_id);
}
}
let compact_threshold = config::compact_threshold().max(1) as u64;
let raft_peers: Vec<(u64, String)> =
peers.iter().map(|peer| (peer.id, peer.addr.clone())).collect();
let handle =
match raft_node::RaftHandle::start(node_id, port, &raft_peers, &raft_dir, compact_threshold)
{
Ok(handle) => handle,
Err(error) => {
pgrx::log!("pg_replica: raft start failed on {}: {}", port, error);
return;
}
};
handle.bootstrap();
pgrx::log!(
"pg_replica: node {} openraft started (raft_port={}, members={:?})",
node_id,
port,
voters
);
let mut last = String::new();
let mut peers_lsn: HashMap<u64, (u64, bool, Instant)> = HashMap::new();
let mut last_heard: HashMap<u64, Instant> = HashMap::new();
let mut peers_reconfirm: HashMap<u64, bool> = HashMap::new();
let mut reconfirm_pending = false;
let mut decided: Option<failover::Decision> = handle.current_decision();
let mut last_proposed: (u64, u64) = (0, 0);
let mut promoted = false;
let mut rejoining = false;
let mut applied_primary: u64 = 0;
let mut applied_read_only: Option<bool> = None;
let mut applied_sync: Option<String> = None;
let mut authorized_since: Option<Instant> = None;
let mut ticks: u64 = 0;
let gossip_every: u64 = 5;
let dead_timeout = Duration::from_millis(2500);
let confirm_window = Duration::from_millis(1500);
let quorum_lease = Duration::from_millis(1200);
let stall_threshold = Duration::from_millis(1500);
let mut last_loop = Instant::now();
while BackgroundWorker::wait_latch(Some(Duration::from_millis(100))) {
let loop_now = Instant::now();
if loop_now.duration_since(last_loop) > stall_threshold {
applied_read_only = None;
authorized_since = None;
reconfirm_pending = true;
}
last_loop = loop_now;
apply::write_heartbeat(&heartbeat_file);
while let Some((from, payload)) = handle.try_recv_gossip() {
last_heard.insert(from, Instant::now());
if let Some(gossip) = failover::decode_gossip(&payload) {
peers_lsn.insert(from, (gossip.lsn, gossip.in_recovery, Instant::now()));
peers_reconfirm.insert(from, gossip.reconfirm);
}
}
if let Some(decision) = handle.current_decision() {
if decided.map_or(true, |current| decision.seq > current.seq) {
decided = Some(decision);
promoted = false;
rejoining = false;
applied_primary = 0;
reconfirm_pending = false;
pgrx::log!(
"pg_replica: node {} DECISION seq={} primary={}",
node_id,
decision.seq,
decision.primary
);
}
}
let in_recovery = unsafe { pg_sys::RecoveryInProgress() };
ticks += 1;
if ticks % gossip_every == 0 {
let lsn = apply::wal_lsn(&psql, &my_host, &my_port, in_recovery);
peers_lsn.insert(node_id, (lsn, in_recovery, Instant::now()));
let payload = failover::encode_gossip(lsn, in_recovery, reconfirm_pending);
handle.gossip_broadcast(node_id, &payload);
}
if handle.is_leader() {
let now = Instant::now();
let live: Vec<(u64, u64, bool)> = peers_lsn
.iter()
.filter(|(id, (_, _, seen))| {
**id == node_id || now.duration_since(*seen) < dead_timeout
})
.map(|(id, (lsn, recovery, _))| (*id, *lsn, *recovery))
.collect();
let current_primary = decided.map(|decision| decision.primary).unwrap_or(0);
let primary_alive =
current_primary != 0 && live.iter().any(|candidate| candidate.0 == current_primary);
let needs_reconfirm = reconfirm_pending
|| live
.iter()
.any(|member| peers_reconfirm.get(&member.0).copied().unwrap_or(false));
let candidate = if decided.is_none() {
live.iter()
.filter(|member| !member.2)
.map(|member| member.0)
.min()
} else if !primary_alive {
let fresh: Vec<(u64, u64, bool)> = live
.iter()
.map(|&(id, gossiped, in_rec)| {
let lsn = if id == node_id {
apply::wal_lsn(&psql, &my_host, &my_port, in_recovery)
} else if let Some(member) =
pg_members.iter().find(|member| member.id == id)
{
let (host, port) = apply::split_host_port(&member.addr);
apply::peer_wal_lsn(&psql, &host, &port).unwrap_or(gossiped)
} else {
gossiped
};
(id, lsn, in_rec)
})
.collect();
failover::choose_primary(&fresh)
} else if needs_reconfirm {
Some(current_primary)
} else {
None
};
if live.len() >= majority {
if let Some(candidate) = candidate {
let seq = decided.map(|decision| decision.seq).unwrap_or(0) + 1;
if last_proposed != (seq, candidate) {
let decision = failover::Decision {
seq,
primary: candidate,
};
if handle.propose(decision) {
last_proposed = (seq, candidate);
pgrx::log!(
"pg_replica: node {} PROPOSE seq={} primary={} live={:?}",
node_id,
seq,
candidate,
live
);
}
}
}
}
}
let decided_primary = decided.map(|decision| decision.primary).unwrap_or(0);
let contact_now = Instant::now();
let reachable = last_heard
.iter()
.filter(|(id, seen)| {
**id != node_id && contact_now.duration_since(**seen) < quorum_lease
})
.count();
let quorum_ok = 1 + reachable >= majority;
if in_recovery {
applied_read_only = None;
authorized_since = None;
if applied_sync.as_deref() != Some("") {
if apply::run_sql(
&psql,
&my_host,
&my_port,
"ALTER SYSTEM SET synchronous_standby_names = ''",
)
.is_ok()
{
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
applied_sync = Some(String::new());
}
}
if decided_primary == node_id {
if !promoted {
pgrx::log!(
"pg_replica: node {} APPLY promote (standby -> primary, fenced until authorized)",
node_id
);
let _ = apply::run_sql(
&psql,
&my_host,
&my_port,
"ALTER SYSTEM SET default_transaction_read_only = on",
);
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
match apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_promote(false)") {
Ok(_) => promoted = true,
Err(error) => {
pgrx::log!("pg_replica: node {} promote failed: {}", node_id, error)
}
}
}
} else if decided_primary != 0 && applied_primary != decided_primary {
if let Some(member) = pg_members.iter().find(|member| member.id == decided_primary) {
let (primary_host, primary_port) = apply::split_host_port(&member.addr);
let passfile_kw = if passfile.is_empty() {
String::new()
} else {
format!(" passfile={}", passfile)
};
let conninfo = format!(
"host={} port={} user=replicator{} application_name=node{}",
primary_host, primary_port, passfile_kw, node_id
);
pgrx::log!(
"pg_replica: node {} APPLY repoint standby -> primary {} ({})",
node_id,
decided_primary,
member.addr
);
if apply::run_sql(
&psql,
&my_host,
&my_port,
&format!("ALTER SYSTEM SET primary_conninfo = '{}'", conninfo),
)
.is_ok()
{
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
applied_primary = decided_primary;
}
}
}
} else {
let raw_auth = match decided {
None => true,
Some(decision) => {
decision.primary == node_id && quorum_ok && !reconfirm_pending
}
};
let authorized = if !raw_auth {
authorized_since = None;
false
} else if decided.is_none() || applied_read_only == Some(false) {
authorized_since = Some(Instant::now());
true
} else {
let since = *authorized_since.get_or_insert_with(Instant::now);
Instant::now().duration_since(since) >= confirm_window
};
let want_sync: Option<&str> = if !synchronous {
Some("")
} else if authorized && decided.is_some() {
Some(sync_target.as_str())
} else {
None
};
let want_read_only = !authorized;
if want_read_only && applied_read_only != Some(true) {
if apply::run_sql(
&psql,
&my_host,
&my_port,
"ALTER SYSTEM SET default_transaction_read_only = on",
)
.is_ok()
{
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
applied_read_only = Some(true);
pgrx::log!(
"pg_replica: node {} FENCE -> read-only (decided_primary={} quorum_ok={})",
node_id,
decided_primary,
quorum_ok
);
}
}
if let Some(want_sync) = want_sync {
if applied_sync.as_deref() != Some(want_sync) {
if apply::run_sql(
&psql,
&my_host,
&my_port,
&format!("ALTER SYSTEM SET synchronous_standby_names = '{}'", want_sync),
)
.is_ok()
{
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
applied_sync = Some(want_sync.to_string());
pgrx::log!(
"pg_replica: node {} synchronous_standby_names = '{}'",
node_id,
want_sync
);
}
}
}
if !want_read_only && applied_read_only != Some(false) {
if apply::run_sql(
&psql,
&my_host,
&my_port,
"ALTER SYSTEM SET default_transaction_read_only = off",
)
.is_ok()
{
let _ = apply::run_sql(&psql, &my_host, &my_port, "SELECT pg_reload_conf()");
applied_read_only = Some(false);
pgrx::log!(
"pg_replica: node {} UNFENCE -> read-write (decided_primary={} quorum_ok={})",
node_id,
decided_primary,
quorum_ok
);
}
}
if !authorized
&& decided_primary != 0
&& decided_primary != node_id
&& !rejoining
&& !rejoin_script.is_empty()
{
if let Some(member) = pg_members.iter().find(|member| member.id == decided_primary) {
let (primary_host, primary_port) = apply::split_host_port(&member.addr);
let datadir = apply::run_sql(&psql, &my_host, &my_port, "SHOW data_directory")
.unwrap_or_default();
if !datadir.is_empty() {
pgrx::log!(
"pg_replica: node {} REJOIN spawn (rewind against primary {} {})",
node_id,
decided_primary,
member.addr
);
apply::spawn_rejoin(
&rejoin_script,
&pgbin,
&datadir,
&primary_host,
&primary_port,
node_id,
&passfile,
);
rejoining = true;
}
}
}
}
let repl = if decided_primary == node_id {
String::from("primary")
} else if decided_primary != 0 {
match pg_members.iter().find(|member| member.id == decided_primary) {
Some(member) => format!("standby<-{}", member.addr),
None => String::from("standby<-?"),
}
} else {
String::from("bootstrap")
};
let snapshot = format!(
"{} term={} leader={} decided_primary={} seq={} quorum={} read_only={} reconfirm={} | repl={} in_recovery={}",
handle.role_name(),
handle.term(),
handle.leader_id(),
decided_primary,
decided.map(|decision| decision.seq).unwrap_or(0),
quorum_ok,
applied_read_only == Some(true),
reconfirm_pending,
repl,
in_recovery
);
if snapshot != last {
pgrx::log!("pg_replica: node {} -> {}", node_id, snapshot);
state::write(node_id, &snapshot);
last = snapshot;
}
}
handle.shutdown();
state::write(node_id, "stopped");
pgrx::log!("pg_replica: supervisor shutting down");
}
#[pg_schema]
mod replica {
use pgrx::prelude::*;
#[pg_extern]
fn status() -> String {
let node_id = crate::config::node_id();
let live = crate::state::read(node_id as u64)
.unwrap_or_else(|| String::from("consensus not started"));
format!(
"pg_replica node_id={} raft_port={} peers=[{}] | {}",
node_id,
crate::config::raft_port(),
crate::config::peers(),
live
)
}
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::prelude::*;
#[pg_test]
fn status_is_reported() {
let reported = Spi::get_one::<String>("SELECT replica.status()")
.expect("SPI failed")
.expect("status() returned NULL");
assert!(reported.contains("node_id="));
}
}
#[cfg(test)]
pub mod pg_test {
pub fn setup(_options: Vec<&str>) {}
pub fn postgresql_conf_options() -> Vec<&'static str> {
vec!["shared_preload_libraries = 'pg_replica'"]
}
}