#ifndef REORDERBUFFER_H
#define REORDERBUFFER_H
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
extern PGDLLIMPORT int logical_decoding_work_mem;
extern PGDLLIMPORT int debug_logical_replication_streaming;
typedef enum
{
DEBUG_LOGICAL_REP_STREAMING_BUFFERED,
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE,
} DebugLogicalRepStreamingMode;
typedef enum ReorderBufferChangeType
{
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INVALIDATION,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
REORDER_BUFFER_CHANGE_TRUNCATE,
} ReorderBufferChangeType;
struct ReorderBufferTXN;
typedef struct ReorderBufferChange
{
XLogRecPtr lsn;
ReorderBufferChangeType action;
struct ReorderBufferTXN *txn;
RepOriginId origin_id;
union
{
struct
{
RelFileLocator rlocator;
bool clear_toast_afterwards;
HeapTuple oldtuple;
HeapTuple newtuple;
} tp;
struct
{
Size nrelids;
bool cascade;
bool restart_seqs;
Oid *relids;
} truncate;
struct
{
char *prefix;
Size message_size;
char *message;
} msg;
Snapshot snapshot;
CommandId command_id;
struct
{
RelFileLocator locator;
ItemPointerData tid;
CommandId cmin;
CommandId cmax;
CommandId combocid;
} tuplecid;
struct
{
uint32 ninvalidations;
SharedInvalidationMessage *invalidations;
} inval;
} data;
dlist_node node;
} ReorderBufferChange;
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
#define rbtxn_has_catalog_changes(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define rbtxn_is_known_subxact(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define rbtxn_is_serialized(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define rbtxn_is_serialized_clear(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define rbtxn_has_partial_change(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define rbtxn_has_streamable_change(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define rbtxn_is_streamed(txn) \
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define rbtxn_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
#define rbtxn_skip_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define rbtxn_is_toptxn(txn) \
( \
(txn)->toptxn == NULL \
)
#define rbtxn_is_subtxn(txn) \
( \
(txn)->toptxn != NULL \
)
#define rbtxn_get_toptxn(txn) \
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)
typedef struct ReorderBufferTXN
{
bits32 txn_flags;
TransactionId xid;
TransactionId toplevel_xid;
char *gid;
XLogRecPtr first_lsn;
XLogRecPtr final_lsn;
XLogRecPtr end_lsn;
struct ReorderBufferTXN *toptxn;
XLogRecPtr restart_decoding_lsn;
RepOriginId origin_id;
XLogRecPtr origin_lsn;
union
{
TimestampTz commit_time;
TimestampTz prepare_time;
TimestampTz abort_time;
} xact_time;
Snapshot base_snapshot;
XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node;
Snapshot snapshot_now;
CommandId command_id;
uint64 nentries;
uint64 nentries_mem;
dlist_head changes;
dlist_head tuplecids;
uint64 ntuplecids;
HTAB *tuplecid_hash;
HTAB *toast_hash;
dlist_head subtxns;
uint32 nsubtxns;
uint32 ninvalidations;
SharedInvalidationMessage *invalidations;
dlist_node node;
dlist_node catchange_node;
pairingheap_node txn_node;
Size size;
Size total_size;
bool concurrent_abort;
void *output_plugin_private;
} ReorderBufferTXN;
typedef struct ReorderBuffer ReorderBuffer;
typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix, Size sz,
const char *message);
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
typedef void (*ReorderBufferStreamStartCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr first_lsn);
typedef void (*ReorderBufferStreamStopCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr last_lsn);
typedef void (*ReorderBufferStreamAbortCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
typedef void (*ReorderBufferStreamPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
typedef void (*ReorderBufferStreamCommitCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
typedef void (*ReorderBufferStreamChangeCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
typedef void (*ReorderBufferStreamMessageCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix, Size sz,
const char *message);
typedef void (*ReorderBufferStreamTruncateCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
typedef void (*ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr lsn);
struct ReorderBuffer
{
HTAB *by_txn;
dlist_head toplevel_by_lsn;
dlist_head txns_by_base_snapshot_lsn;
dclist_head catchange_txns;
TransactionId by_txn_last_xid;
ReorderBufferTXN *by_txn_last_txn;
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
ReorderBufferBeginCB begin_prepare;
ReorderBufferPrepareCB prepare;
ReorderBufferCommitPreparedCB commit_prepared;
ReorderBufferRollbackPreparedCB rollback_prepared;
ReorderBufferStreamStartCB stream_start;
ReorderBufferStreamStopCB stream_stop;
ReorderBufferStreamAbortCB stream_abort;
ReorderBufferStreamPrepareCB stream_prepare;
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
ReorderBufferStreamTruncateCB stream_truncate;
ReorderBufferUpdateProgressTxnCB update_progress_txn;
void *private_data;
bool output_rewrites;
MemoryContext context;
MemoryContext change_context;
MemoryContext txn_context;
MemoryContext tup_context;
XLogRecPtr current_restart_decoding_lsn;
char *outbuf;
Size outbufsize;
Size size;
pairingheap *txn_heap;
int64 spillTxns;
int64 spillCount;
int64 spillBytes;
int64 streamTxns;
int64 streamCount;
int64 streamBytes;
int64 totalTxns;
int64 totalBytes;
};
extern ReorderBuffer *ReorderBufferAllocate(void);
extern void ReorderBufferFree(ReorderBuffer *rb);
extern HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb,
Size tuple_len);
extern void ReorderBufferReturnTupleBuf(HeapTuple tuple);
extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb);
extern void ReorderBufferReturnChange(ReorderBuffer *rb,
ReorderBufferChange *change, bool upd_mem);
extern Oid *ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids);
extern void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids);
extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, ReorderBufferChange *change,
bool toast_insert);
extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Snapshot snap, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr two_phase_at,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr lsn);
extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn);
extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
TimestampTz abort_time);
extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, Snapshot snap);
extern void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, Snapshot snap);
extern void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, CommandId cid);
extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, RelFileLocator locator,
ItemPointerData tid,
CommandId cmin, CommandId cmax, CommandId combocid);
extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid);
extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid);
extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
TimestampTz prepare_time,
RepOriginId origin_id, XLogRecPtr origin_lsn);
extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *rb);
extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
extern void StartupReorderBuffer(void);
#endif