#ifndef _WALRECEIVER_H
#define _WALRECEIVER_H
#include <netdb.h>
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "pgtime.h"
#include "port/atomics.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "utils/tuplestore.h"
extern PGDLLIMPORT int wal_receiver_status_interval;
extern PGDLLIMPORT int wal_receiver_timeout;
extern PGDLLIMPORT bool hot_standby_feedback;
#define MAXCONNINFO 1024
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
typedef enum
{
WALRCV_STOPPED,
WALRCV_STARTING,
WALRCV_STREAMING,
WALRCV_WAITING,
WALRCV_RESTARTING,
WALRCV_STOPPING,
} WalRcvState;
typedef struct
{
pid_t pid;
WalRcvState walRcvState;
ConditionVariable walRcvStoppedCV;
pg_time_t startTime;
XLogRecPtr receiveStart;
TimeLineID receiveStartTLI;
XLogRecPtr flushedUpto;
TimeLineID receivedTLI;
XLogRecPtr latestChunkStart;
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
XLogRecPtr latestWalEnd;
TimestampTz latestWalEndTime;
char conninfo[MAXCONNINFO];
char sender_host[NI_MAXHOST];
int sender_port;
char slotname[NAMEDATALEN];
bool is_temp_slot;
bool ready_to_display;
Latch *latch;
slock_t mutex;
pg_atomic_uint64 writtenUpto;
sig_atomic_t force_reply;
} WalRcvData;
extern PGDLLIMPORT WalRcvData *WalRcv;
typedef struct
{
bool logical;
char *slotname;
XLogRecPtr startpoint;
union
{
struct
{
TimeLineID startpointTLI;
} physical;
struct
{
uint32 proto_version;
List *publication_names;
bool binary;
char *streaming_str;
bool twophase;
char *origin;
} logical;
} proto;
} WalRcvStreamOptions;
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
typedef enum
{
WALRCV_ERROR,
WALRCV_OK_COMMAND,
WALRCV_OK_TUPLES,
WALRCV_OK_COPY_IN,
WALRCV_OK_COPY_OUT,
WALRCV_OK_COPY_BOTH,
} WalRcvExecStatus;
typedef struct WalRcvExecResult
{
WalRcvExecStatus status;
int sqlstate;
char *err;
Tuplestorestate *tuplestore;
TupleDesc tupledesc;
} WalRcvExecResult;
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
bool replication,
bool logical,
bool must_use_password,
const char *appname,
char **err);
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
bool must_use_password);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
char **sender_host,
int *sender_port);
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
TimeLineID tli,
char **filename,
char **content,
int *size);
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
const WalRcvStreamOptions *options);
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
TimeLineID *next_tli);
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
char **buffer,
pgsocket *wait_fd);
typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool failover);
typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
const char *query,
const int nRetTypes,
const Oid *retTypes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
walrcv_server_version_fn walrcv_server_version;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
walrcv_endstreaming_fn walrcv_endstreaming;
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
walrcv_alter_slot_fn walrcv_alter_slot;
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
#define walrcv_get_dbname_from_conninfo(conninfo) \
WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_server_version(conn) \
WalReceiverFunctions->walrcv_server_version(conn)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options) \
WalReceiverFunctions->walrcv_startstreaming(conn, options)
#define walrcv_endstreaming(conn, next_tli) \
WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd) \
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
#define walrcv_alter_slot(conn, slotname, failover) \
WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
static inline void
walrcv_clear_result(WalRcvExecResult *walres)
{
if (!walres)
return;
if (walres->err)
pfree(walres->err);
if (walres->tuplestore)
tuplestore_end(walres->tuplestore);
if (walres->tupledesc)
FreeTupleDesc(walres->tupledesc);
pfree(walres);
}
extern void WalReceiverMain(char *startup_data, size_t startup_data_len) pg_attribute_noreturn();
extern void ProcessWalRcvInterrupts(void);
extern void WalRcvForceReply(void);
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname,
bool create_temp_slot);
extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
#endif