#ifndef REORDERBUFFER_H
#define REORDERBUFFER_H
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
extern PGDLLIMPORT int logical_decoding_work_mem;
typedef struct ReorderBufferTupleBuf
{
slist_node node;
HeapTupleData tuple;
Size alloc_tuple_size;
} ReorderBufferTupleBuf;
#define ReorderBufferTupleBufData(p) \
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
enum ReorderBufferChangeType
{
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_TRUNCATE
};
struct ReorderBufferTXN;
typedef struct ReorderBufferChange
{
XLogRecPtr lsn;
enum ReorderBufferChangeType action;
struct ReorderBufferTXN *txn;
RepOriginId origin_id;
union
{
struct
{
RelFileNode relnode;
bool clear_toast_afterwards;
ReorderBufferTupleBuf *oldtuple;
ReorderBufferTupleBuf *newtuple;
} tp;
struct
{
Size nrelids;
bool cascade;
bool restart_seqs;
Oid *relids;
} truncate;
struct
{
char *prefix;
Size message_size;
char *message;
} msg;
Snapshot snapshot;
CommandId command_id;
struct
{
RelFileNode node;
ItemPointerData tid;
CommandId cmin;
CommandId cmax;
CommandId combocid;
} tuplecid;
} data;
dlist_node node;
} ReorderBufferChange;
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
#define rbtxn_has_catalog_changes(txn) \
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define rbtxn_is_known_subxact(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define rbtxn_is_serialized(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
typedef struct ReorderBufferTXN
{
bits32 txn_flags;
TransactionId xid;
TransactionId toplevel_xid;
XLogRecPtr first_lsn;
XLogRecPtr final_lsn;
XLogRecPtr end_lsn;
XLogRecPtr restart_decoding_lsn;
RepOriginId origin_id;
XLogRecPtr origin_lsn;
TimestampTz commit_time;
Snapshot base_snapshot;
XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node;
uint64 nentries;
uint64 nentries_mem;
dlist_head changes;
dlist_head tuplecids;
uint64 ntuplecids;
HTAB *tuplecid_hash;
HTAB *toast_hash;
dlist_head subtxns;
uint32 nsubtxns;
uint32 ninvalidations;
SharedInvalidationMessage *invalidations;
dlist_node node;
Size size;
} ReorderBufferTXN;
typedef struct ReorderBuffer ReorderBuffer;
typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix, Size sz,
const char *message);
struct ReorderBuffer
{
HTAB *by_txn;
dlist_head toplevel_by_lsn;
dlist_head txns_by_base_snapshot_lsn;
TransactionId by_txn_last_xid;
ReorderBufferTXN *by_txn_last_txn;
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
void *private_data;
bool output_rewrites;
MemoryContext context;
MemoryContext change_context;
MemoryContext txn_context;
MemoryContext tup_context;
XLogRecPtr current_restart_decoding_lsn;
char *outbuf;
Size outbufsize;
Size size;
};
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
CommandId cid);
void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
RelFileNode node, ItemPointerData pt,
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
void StartupReorderBuffer(void);
#endif