#include "db_config.h"
#include "db_int.h"
#include "dbinc/crypto.h"
#include "dbinc/hmac.h"
#include "dbinc/log.h"
#include "dbinc/txn.h"
#include "dbinc/db_page.h"
#include "dbinc_auto/db_ext.h"
static int __log_encrypt_record __P((ENV *, DBT *, HDR *, u_int32_t));
static int __log_file __P((ENV *, const DB_LSN *, char *, size_t));
static int __log_fill __P((DB_LOG *, DB_LSN *, void *, u_int32_t));
static int __log_flush_commit __P((ENV *, const DB_LSN *, u_int32_t));
static int __log_newfh __P((DB_LOG *, int));
static int __log_put_next __P((ENV *,
DB_LSN *, const DBT *, HDR *, DB_LSN *));
static int __log_put_record_int __P((ENV *, DB *, DB_TXN *, DB_LSN *,
u_int32_t, u_int32_t, u_int32_t, u_int32_t, DB_LOG_RECSPEC *, va_list));
static int __log_putr __P((DB_LOG *,
DB_LSN *, const DBT *, u_int32_t, HDR *));
static int __log_write __P((DB_LOG *, void *, u_int32_t));
int
__log_put_pp(dbenv, lsnp, udbt, flags)
DB_ENV *dbenv;
DB_LSN *lsnp;
const DBT *udbt;
u_int32_t flags;
{
DB_THREAD_INFO *ip;
ENV *env;
int ret;
env = dbenv->env;
ENV_REQUIRES_CONFIG(env,
env->lg_handle, "DB_ENV->log_put", DB_INIT_LOG);
if ((ret = __db_fchk(env, "DB_ENV->log_put", flags,
DB_LOG_CHKPNT | DB_LOG_COMMIT |
DB_FLUSH | DB_LOG_NOCOPY | DB_LOG_WRNOSYNC)) != 0)
return (ret);
if (LF_ISSET(DB_LOG_WRNOSYNC) && LF_ISSET(DB_FLUSH))
return (__db_ferr(env, "DB_ENV->log_put", 1));
if (IS_REP_CLIENT(env)) {
__db_errx(env, DB_STR("2511",
"DB_ENV->log_put is illegal on replication clients"));
return (EINVAL);
}
ENV_ENTER(env, ip);
REPLICATION_WRAP(env, (__log_put(env, lsnp, udbt, flags)), 0, ret);
ENV_LEAVE(env, ip);
return (ret);
}
int
__log_put(env, lsnp, udbt, flags)
ENV *env;
DB_LSN *lsnp;
const DBT *udbt;
u_int32_t flags;
{
DBT *dbt, t;
DB_CIPHER *db_cipher;
DB_LOG *dblp;
DB_LSN lsn, old_lsn;
DB_REP *db_rep;
HDR hdr;
LOG *lp;
REP *rep;
int lock_held, need_free, ret;
u_int8_t *key;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
db_cipher = env->crypto_handle;
db_rep = env->rep_handle;
if (db_rep != NULL)
rep = db_rep->region;
else
rep = NULL;
dbt = &t;
t = *udbt;
lock_held = need_free = 0;
ZERO_LSN(old_lsn);
hdr.len = hdr.prev = 0;
if (IS_REP_MASTER(env) && db_rep->send == NULL) {
#ifdef HAVE_REPLICATION_THREADS
if (F_ISSET(env, ENV_THREAD) && APP_IS_REPMGR(env)) {
if ((ret = __repmgr_autostart(env)) != 0)
return (ret);
} else
#endif
{
#if !defined(DEBUG_ROP) && !defined(DEBUG_WOP)
__db_errx(env, DB_STR("2512",
"Non-replication DB_ENV handle attempting "
"to modify a replicated environment"));
return (EINVAL);
#endif
}
}
DB_ASSERT(env, !IS_REP_CLIENT(env));
if (!LF_ISSET(DB_LOG_NOCOPY) || IS_REP_MASTER(env)) {
if (CRYPTO_ON(env))
t.size += db_cipher->adj_size(udbt->size);
if ((ret = __os_calloc(env, 1, t.size, &t.data)) != 0)
goto err;
need_free = 1;
memcpy(t.data, udbt->data, udbt->size);
}
if ((ret = __log_encrypt_record(env, dbt, &hdr, udbt->size)) != 0)
goto err;
if (CRYPTO_ON(env))
key = db_cipher->mac_key;
else
key = NULL;
#ifdef HAVE_LOG_CHECKSUM
__db_chksum(&hdr, dbt->data, dbt->size, key, hdr.chksum);
#endif
LOG_SYSTEM_LOCK(env);
lock_held = 1;
if ((ret = __log_put_next(env, &lsn, dbt, &hdr, &old_lsn)) != 0)
goto panic_check;
lsnp->file = lsn.file;
lsnp->offset = lsn.offset;
#ifdef HAVE_REPLICATION
if (IS_REP_MASTER(env)) {
__rep_newfile_args nf_args;
DBT newfiledbt;
REP_BULK bulk;
size_t len;
u_int32_t ctlflags;
u_int8_t buf[__REP_NEWFILE_SIZE];
ctlflags = LF_ISSET(DB_LOG_COMMIT | DB_LOG_CHKPNT) ?
REPCTL_PERM : 0;
LOG_SYSTEM_UNLOCK(env);
lock_held = 0;
if (LF_ISSET(DB_FLUSH))
ctlflags |= REPCTL_FLUSH;
if (!IS_ZERO_LSN(old_lsn)) {
memset(&newfiledbt, 0, sizeof(newfiledbt));
nf_args.version = lp->persist.version;
(void)__rep_newfile_marshal(env, &nf_args,
buf, __REP_NEWFILE_SIZE, &len);
DB_INIT_DBT(newfiledbt, buf, len);
(void)__rep_send_message(env, DB_EID_BROADCAST,
REP_NEWFILE, &old_lsn, &newfiledbt, 0, 0);
}
ret = 0;
if (FLD_ISSET(rep->config, REP_C_BULK)) {
if (db_rep->bulk == NULL)
db_rep->bulk = R_ADDR(&dblp->reginfo,
lp->bulk_buf);
memset(&bulk, 0, sizeof(bulk));
bulk.addr = db_rep->bulk;
bulk.offp = &lp->bulk_off;
bulk.len = lp->bulk_len;
bulk.lsn = lsn;
bulk.type = REP_BULK_LOG;
bulk.eid = DB_EID_BROADCAST;
bulk.flagsp = &lp->bulk_flags;
ret = __rep_bulk_message(env, &bulk, NULL,
&lsn, udbt, ctlflags);
}
if (!FLD_ISSET(rep->config, REP_C_BULK) ||
ret == DB_REP_BULKOVF) {
ret = __rep_send_message(env, DB_EID_BROADCAST,
REP_LOG, &lsn, udbt, ctlflags, 0);
}
if (FLD_ISSET(ctlflags, REPCTL_PERM)) {
LOG_SYSTEM_LOCK(env);
#ifdef HAVE_STATISTICS
if (IS_USING_LEASES(env))
rep->stat.st_lease_sends++;
#endif
if (LOG_COMPARE(&lp->max_perm_lsn, &lsn) < 0)
lp->max_perm_lsn = lsn;
LOG_SYSTEM_UNLOCK(env);
}
if (ret != 0 && FLD_ISSET(ctlflags, REPCTL_PERM))
LF_SET(DB_FLUSH);
ret = 0;
}
#endif
if (LF_ISSET(DB_FLUSH | DB_LOG_WRNOSYNC)) {
if (!lock_held) {
LOG_SYSTEM_LOCK(env);
lock_held = 1;
}
if ((ret = __log_flush_commit(env, &lsn, flags)) != 0)
goto panic_check;
}
if (LF_ISSET(DB_LOG_CHKPNT))
lp->stat.st_wc_bytes = lp->stat.st_wc_mbytes = 0;
STAT(++lp->stat.st_record);
if (0) {
panic_check:
if (ret != 0 && IS_REP_MASTER(env))
ret = __env_panic(env, ret);
}
err: if (lock_held)
LOG_SYSTEM_UNLOCK(env);
if (need_free)
__os_free(env, dbt->data);
if (ret == 0 && !IS_ZERO_LSN(old_lsn) && lp->db_log_autoremove)
__log_autoremove(env);
return (ret);
}
int
__log_current_lsn_int(env, lsnp, mbytesp, bytesp)
ENV *env;
DB_LSN *lsnp;
u_int32_t *mbytesp, *bytesp;
{
DB_LOG *dblp;
LOG *lp;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
LOG_SYSTEM_LOCK(env);
*lsnp = lp->lsn;
if (lp->lsn.offset > lp->len)
lsnp->offset -= lp->len;
if (mbytesp != NULL) {
*mbytesp = lp->stat.st_wc_mbytes;
*bytesp = (u_int32_t)(lp->stat.st_wc_bytes + lp->b_off);
}
LOG_SYSTEM_UNLOCK(env);
return (0);
}
int
__log_current_lsn(env, lsnp, mbytesp, bytesp)
ENV *env;
DB_LSN *lsnp;
u_int32_t *mbytesp, *bytesp;
{
DB_THREAD_INFO *ip;
int ret;
ret = 0;
ENV_ENTER(env, ip);
ret = __log_current_lsn_int(env, lsnp, mbytesp, bytesp);
ENV_LEAVE(env, ip);
return ret;
}
static int
__log_put_next(env, lsn, dbt, hdr, old_lsnp)
ENV *env;
DB_LSN *lsn;
const DBT *dbt;
HDR *hdr;
DB_LSN *old_lsnp;
{
DB_LOG *dblp;
DB_LSN old_lsn;
LOG *lp;
int adv_file, newfile, ret;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
old_lsn = lp->lsn;
newfile = 0;
adv_file = 0;
if (lp->persist.version != DB_LOGVERSION) {
__log_set_version(env, DB_LOGVERSION);
adv_file = 1;
}
if (adv_file || lp->lsn.offset == 0 ||
lp->lsn.offset + hdr->size + dbt->size > lp->log_size) {
if (hdr->size + sizeof(LOGP) + dbt->size > lp->log_size) {
__db_errx(env, DB_STR_A("2513",
"DB_ENV->log_put: record larger than maximum file size (%lu > %lu)",
"%lu %lu"),
(u_long)hdr->size + sizeof(LOGP) + dbt->size,
(u_long)lp->log_size);
return (EINVAL);
}
if ((ret = __log_newfile(dblp, NULL, 0, 0)) != 0)
return (ret);
newfile = 1;
}
if (newfile)
*old_lsnp = old_lsn;
return (__log_putr(dblp, lsn, dbt, lp->lsn.offset - lp->len, hdr));
}
static int
__log_flush_commit(env, lsnp, flags)
ENV *env;
const DB_LSN *lsnp;
u_int32_t flags;
{
DB_LOG *dblp;
DB_LSN flush_lsn;
HDR hdr;
LOG *lp;
int ret, t_ret;
size_t nr, nw;
u_int8_t *buffer;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
flush_lsn = *lsnp;
ret = 0;
if (LF_ISSET(DB_FLUSH))
ret = __log_flush_int(dblp, &flush_lsn, 1);
else if (!lp->db_log_inmemory && lp->b_off != 0)
if ((ret = __log_write(dblp,
dblp->bufp, (u_int32_t)lp->b_off)) == 0)
lp->b_off = 0;
if (ret == 0 || !LF_ISSET(DB_LOG_COMMIT))
return (ret);
if (LF_ISSET(DB_FLUSH) ?
flush_lsn.file != lp->s_lsn.file ||
flush_lsn.offset < lp->s_lsn.offset :
flush_lsn.file != lp->lsn.file || flush_lsn.offset < lp->w_off)
return (0);
if (IS_REP_MASTER(env)) {
__db_err(env, ret, DB_STR("2514",
"Write failed on MASTER commit."));
return (__env_panic(env, ret));
}
if (flush_lsn.offset > lp->w_off) {
if ((t_ret = __txn_force_abort(env,
dblp->bufp + flush_lsn.offset - lp->w_off)) != 0)
return (__env_panic(env, t_ret));
} else {
if (
(t_ret = __os_seek(env,
dblp->lfhp, 0, 0, flush_lsn.offset)) != 0 ||
(t_ret = __os_read(env, dblp->lfhp, &hdr,
HDR_NORMAL_SZ, &nr)) != 0 || nr != HDR_NORMAL_SZ)
return (__env_panic(env, t_ret == 0 ? EIO : t_ret));
if (LOG_SWAPPED(env))
__log_hdrswap(&hdr, CRYPTO_ON(env));
if ((t_ret = __os_malloc(env, hdr.len, &buffer)) != 0 ||
(t_ret = __os_seek(env,
dblp->lfhp, 0, 0, flush_lsn.offset)) != 0 ||
(t_ret = __os_read(env, dblp->lfhp, buffer,
hdr.len, &nr)) != 0 || nr != hdr.len ||
(t_ret = __txn_force_abort(env, buffer)) != 0 ||
(t_ret = __os_seek(env,
dblp->lfhp, 0, 0, flush_lsn.offset)) != 0 ||
(t_ret = __os_write(env, dblp->lfhp, buffer,
nr, &nw)) != 0 || nw != nr)
return (__env_panic(env, t_ret == 0 ? EIO : t_ret));
__os_free(env, buffer);
}
(void)__log_flush_int(dblp, &flush_lsn, 0);
return (ret);
}
int
__log_newfile(dblp, lsnp, logfile, version)
DB_LOG *dblp;
DB_LSN *lsnp;
u_int32_t logfile;
u_int32_t version;
{
DBT t;
DB_CIPHER *db_cipher;
DB_LSN lsn;
ENV *env;
HDR hdr;
LOG *lp;
LOGP *tpersist;
int need_free, ret;
u_int32_t lastoff;
size_t tsize;
env = dblp->env;
lp = dblp->reginfo.primary;
if (logfile == 0 && lp->lsn.offset != 0) {
if ((ret = __log_flush_int(dblp, NULL, 0)) != 0)
return (ret);
lastoff = lp->lsn.offset;
++lp->lsn.file;
lp->lsn.offset = 0;
lp->w_off = 0;
} else
lastoff = 0;
if (logfile != 0) {
lp->lsn.file = logfile;
lp->lsn.offset = 0;
lp->w_off = 0;
if (lp->db_log_inmemory) {
lsn = lp->lsn;
(void)__log_zero(env, &lsn);
} else {
lp->s_lsn = lp->lsn;
if ((ret = __log_newfh(dblp, 1)) != 0)
return (ret);
}
}
DB_ASSERT(env, lp->db_log_inmemory || lp->b_off == 0);
if (lp->db_log_inmemory &&
(ret = __log_inmem_newfile(dblp, lp->lsn.file)) != 0)
return (ret);
memset(&t, 0, sizeof(t));
memset(&hdr, 0, sizeof(HDR));
need_free = 0;
tsize = sizeof(LOGP);
db_cipher = env->crypto_handle;
if (CRYPTO_ON(env))
tsize += db_cipher->adj_size(tsize);
if ((ret = __os_calloc(env, 1, tsize, &tpersist)) != 0)
return (ret);
need_free = 1;
if (version != 0) {
__log_set_version(env, version);
if ((ret = __env_init_rec(env, version)) != 0)
goto err;
}
lp->persist.log_size = lp->log_size = lp->log_nsize;
memcpy(tpersist, &lp->persist, sizeof(LOGP));
DB_SET_DBT(t, tpersist, tsize);
if (LOG_SWAPPED(env))
__log_persistswap(tpersist);
if ((ret =
__log_encrypt_record(env, &t, &hdr, (u_int32_t)tsize)) != 0)
goto err;
if ((ret = __log_putr(dblp, &lsn,
&t, lastoff == 0 ? 0 : lastoff - lp->len, &hdr)) != 0)
goto err;
if (lsnp != NULL)
*lsnp = lp->lsn;
err: if (need_free)
__os_free(env, tpersist);
return (ret);
}
static int
__log_putr(dblp, lsn, dbt, prev, h)
DB_LOG *dblp;
DB_LSN *lsn;
const DBT *dbt;
u_int32_t prev;
HDR *h;
{
DB_CIPHER *db_cipher;
DB_LSN f_lsn;
ENV *env;
HDR tmp, *hdr;
LOG *lp;
int ret, t_ret;
db_size_t b_off;
size_t nr;
u_int32_t w_off;
env = dblp->env;
lp = dblp->reginfo.primary;
db_cipher = env->crypto_handle;
if (h == NULL) {
hdr = &tmp;
memset(hdr, 0, sizeof(HDR));
if (CRYPTO_ON(env))
hdr->size = HDR_CRYPTO_SZ;
else
hdr->size = HDR_NORMAL_SZ;
} else
hdr = h;
b_off = lp->b_off;
w_off = lp->w_off;
f_lsn = lp->f_lsn;
hdr->prev = prev;
hdr->len = (u_int32_t)hdr->size + dbt->size;
#ifdef HAVE_LOG_CHECKSUM
if (hdr->chksum[0] == 0) {
if (lp->persist.version < DB_LOGCHKSUM)
__db_chksum(NULL, dbt->data, dbt->size,
(CRYPTO_ON(env)) ? db_cipher->mac_key : NULL,
hdr->chksum);
else
__db_chksum(hdr, dbt->data, dbt->size,
(CRYPTO_ON(env)) ? db_cipher->mac_key : NULL,
hdr->chksum);
} else if (lp->persist.version >= DB_LOGCHKSUM)
LOG_HDR_SUM(CRYPTO_ON(env), hdr, hdr->chksum);
#endif
if (lp->db_log_inmemory && (ret = __log_inmem_chkspace(dblp,
(u_int32_t)hdr->size + dbt->size)) != 0)
goto err;
*lsn = lp->lsn;
nr = hdr->size;
if (LOG_SWAPPED(env))
__log_hdrswap(hdr, CRYPTO_ON(env));
ret = __log_fill(dblp, lsn, hdr, (u_int32_t)nr);
if (LOG_SWAPPED(env))
__log_hdrswap(hdr, CRYPTO_ON(env));
if (ret != 0)
goto err;
if ((ret = __log_fill(dblp, lsn, dbt->data, dbt->size)) != 0)
goto err;
lp->len = (u_int32_t)(hdr->size + dbt->size);
lp->lsn.offset += lp->len;
return (0);
err:
if (w_off + lp->buffer_size < lp->w_off) {
DB_ASSERT(env, !lp->db_log_inmemory);
if ((t_ret = __os_seek(env, dblp->lfhp, 0, 0, w_off)) != 0 ||
(t_ret = __os_read(env, dblp->lfhp, dblp->bufp,
b_off, &nr)) != 0)
return (__env_panic(env, t_ret));
if (nr != b_off) {
__db_errx(env, DB_STR("2515",
"Short read while restoring log"));
return (__env_panic(env, EIO));
}
}
lp->w_off = w_off;
lp->b_off = b_off;
lp->f_lsn = f_lsn;
return (ret);
}
int
__log_flush_pp(dbenv, lsn)
DB_ENV *dbenv;
const DB_LSN *lsn;
{
DB_THREAD_INFO *ip;
ENV *env;
int ret;
env = dbenv->env;
ENV_REQUIRES_CONFIG(env,
env->lg_handle, "DB_ENV->log_flush", DB_INIT_LOG);
ENV_ENTER(env, ip);
REPLICATION_WRAP(env, (__log_flush(env, lsn)), 0, ret);
ENV_LEAVE(env, ip);
return (ret);
}
#define ALREADY_FLUSHED(lp, lsnp) \
(((lp)->s_lsn.file > (lsnp)->file) || \
((lp)->s_lsn.file == (lsnp)->file && \
(lp)->s_lsn.offset > (lsnp)->offset))
int
__log_flush(env, lsn)
ENV *env;
const DB_LSN *lsn;
{
DB_LOG *dblp;
LOG *lp;
int ret;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
if (lsn != NULL && ALREADY_FLUSHED(lp, lsn))
return (0);
LOG_SYSTEM_LOCK(env);
ret = __log_flush_int(dblp, lsn, 1);
LOG_SYSTEM_UNLOCK(env);
return (ret);
}
int
__log_flush_int(dblp, lsnp, release)
DB_LOG *dblp;
const DB_LSN *lsnp;
int release;
{
struct __db_commit *commit;
ENV *env;
DB_LSN flush_lsn, f_lsn;
LOG *lp;
size_t b_off;
u_int32_t ncommit, w_off;
int do_flush, first, ret;
env = dblp->env;
lp = dblp->reginfo.primary;
ncommit = 0;
ret = 0;
if (lp->db_log_inmemory) {
lp->s_lsn = lp->lsn;
STAT(++lp->stat.st_scount);
return (0);
}
if (lsnp == NULL) {
flush_lsn.file = lp->lsn.file;
flush_lsn.offset = lp->lsn.offset - lp->len;
} else if (lsnp->file > lp->lsn.file ||
(lsnp->file == lp->lsn.file &&
lsnp->offset > lp->lsn.offset - lp->len)) {
__db_errx(env, DB_STR_A("2516",
"DB_ENV->log_flush: LSN of %lu/%lu past current end-of-log of %lu/%lu",
"%lu %lu %lu %lu"), (u_long)lsnp->file,
(u_long)lsnp->offset, (u_long)lp->lsn.file,
(u_long)lp->lsn.offset);
__db_errx(env, DB_STR("2517",
"Database environment corrupt; the wrong log files may "
"have been removed or incompatible database files "
"imported from another environment"));
return (__env_panic(env, DB_RUNRECOVERY));
} else {
if (ALREADY_FLUSHED(lp, lsnp))
return (0);
flush_lsn = *lsnp;
}
if (release && lp->in_flush != 0) {
if ((commit = SH_TAILQ_FIRST(
&lp->free_commits, __db_commit)) == NULL) {
if ((ret = __env_alloc(&dblp->reginfo,
sizeof(struct __db_commit), &commit)) != 0)
goto flush;
memset(commit, 0, sizeof(*commit));
if ((ret = __mutex_alloc(env, MTX_TXN_COMMIT,
DB_MUTEX_SELF_BLOCK, &commit->mtx_txnwait)) != 0) {
__env_alloc_free(&dblp->reginfo, commit);
return (ret);
}
MUTEX_LOCK(env, commit->mtx_txnwait);
} else
SH_TAILQ_REMOVE(
&lp->free_commits, commit, links, __db_commit);
lp->ncommit++;
if (LOG_COMPARE(&lp->t_lsn, &flush_lsn) < 0)
lp->t_lsn = flush_lsn;
commit->lsn = flush_lsn;
SH_TAILQ_INSERT_HEAD(
&lp->commits, commit, links, __db_commit);
LOG_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, commit->mtx_txnwait);
LOG_SYSTEM_LOCK(env);
lp->ncommit--;
do_flush = F_ISSET(commit, DB_COMMIT_FLUSH);
F_CLR(commit, DB_COMMIT_FLUSH);
SH_TAILQ_INSERT_HEAD(
&lp->free_commits, commit, links, __db_commit);
if (do_flush) {
lp->in_flush--;
flush_lsn = lp->t_lsn;
} else
return (0);
}
flush: MUTEX_LOCK(env, lp->mtx_flush);
if (flush_lsn.file < lp->s_lsn.file ||
(flush_lsn.file == lp->s_lsn.file &&
flush_lsn.offset < lp->s_lsn.offset)) {
MUTEX_UNLOCK(env, lp->mtx_flush);
goto done;
}
if (lp->b_off != 0 && LOG_COMPARE(&flush_lsn, &lp->f_lsn) >= 0) {
if ((ret = __log_write(dblp,
dblp->bufp, (u_int32_t)lp->b_off)) != 0) {
MUTEX_UNLOCK(env, lp->mtx_flush);
goto done;
}
lp->b_off = 0;
} else if (dblp->lfhp == NULL || dblp->lfname != lp->lsn.file)
if ((ret = __log_newfh(dblp, 0)) != 0) {
MUTEX_UNLOCK(env, lp->mtx_flush);
goto done;
}
b_off = lp->b_off;
w_off = lp->w_off;
f_lsn = lp->f_lsn;
lp->in_flush++;
if (release)
LOG_SYSTEM_UNLOCK(env);
if ((ret = __os_fsync(env, dblp->lfhp)) != 0) {
MUTEX_UNLOCK(env, lp->mtx_flush);
if (release)
LOG_SYSTEM_LOCK(env);
lp->in_flush--;
goto done;
}
lp->s_lsn = f_lsn;
if (b_off == 0)
lp->s_lsn.offset = w_off;
MUTEX_UNLOCK(env, lp->mtx_flush);
if (release)
LOG_SYSTEM_LOCK(env);
lp->in_flush--;
STAT(++lp->stat.st_scount);
ncommit = 1;
done:
if (lp->ncommit != 0) {
first = 1;
SH_TAILQ_FOREACH(commit, &lp->commits, links, __db_commit)
if (LOG_COMPARE(&lp->s_lsn, &commit->lsn) > 0) {
MUTEX_UNLOCK(env, commit->mtx_txnwait);
SH_TAILQ_REMOVE(
&lp->commits, commit, links, __db_commit);
ncommit++;
} else if (first == 1) {
F_SET(commit, DB_COMMIT_FLUSH);
MUTEX_UNLOCK(env, commit->mtx_txnwait);
SH_TAILQ_REMOVE(
&lp->commits, commit, links, __db_commit);
lp->in_flush++;
first = 0;
}
}
#ifdef HAVE_STATISTICS
if (lp->stat.st_maxcommitperflush < ncommit)
lp->stat.st_maxcommitperflush = ncommit;
if (lp->stat.st_mincommitperflush > ncommit ||
lp->stat.st_mincommitperflush == 0)
lp->stat.st_mincommitperflush = ncommit;
#endif
return (ret);
}
static int
__log_fill(dblp, lsn, addr, len)
DB_LOG *dblp;
DB_LSN *lsn;
void *addr;
u_int32_t len;
{
LOG *lp;
u_int32_t bsize, nrec;
size_t nw, remain;
int ret;
lp = dblp->reginfo.primary;
bsize = lp->buffer_size;
if (lp->db_log_inmemory) {
__log_inmem_copyin(dblp, lp->b_off, addr, len);
lp->b_off = (lp->b_off + len) % lp->buffer_size;
return (0);
}
while (len > 0) {
if (lp->b_off == 0)
lp->f_lsn = *lsn;
if (lp->b_off == 0 && len >= bsize) {
nrec = len / bsize;
if ((ret = __log_write(dblp, addr, nrec * bsize)) != 0)
return (ret);
addr = (u_int8_t *)addr + nrec * bsize;
len -= nrec * bsize;
STAT(++lp->stat.st_wcount_fill);
continue;
}
remain = bsize - lp->b_off;
nw = remain > len ? len : remain;
memcpy(dblp->bufp + lp->b_off, addr, nw);
addr = (u_int8_t *)addr + nw;
len -= (u_int32_t)nw;
lp->b_off += (u_int32_t)nw;
if (lp->b_off == bsize) {
if ((ret = __log_write(dblp, dblp->bufp, bsize)) != 0)
return (ret);
lp->b_off = 0;
STAT(++lp->stat.st_wcount_fill);
}
}
return (0);
}
static int
__log_write(dblp, addr, len)
DB_LOG *dblp;
void *addr;
u_int32_t len;
{
ENV *env;
LOG *lp;
size_t nw;
int ret;
env = dblp->env;
lp = dblp->reginfo.primary;
DB_ASSERT(env, !lp->db_log_inmemory);
if (dblp->lfhp == NULL || dblp->lfname != lp->lsn.file ||
dblp->lf_timestamp != lp->timestamp)
if ((ret = __log_newfh(dblp, lp->w_off == 0)) != 0)
return (ret);
#ifdef HAVE_FILESYSTEM_NOTZERO
if (lp->w_off == 0 && !__os_fs_notzero()) {
#else
if (lp->w_off == 0) {
#endif
(void)__db_file_extend(env, dblp->lfhp, lp->log_size);
if (F_ISSET(dblp, DBLOG_ZERO))
(void)__db_zero_extend(env, dblp->lfhp,
0, lp->log_size/lp->buffer_size, lp->buffer_size);
}
if ((ret = __os_io(env, DB_IO_WRITE,
dblp->lfhp, 0, 0, lp->w_off, len, addr, &nw)) != 0)
return (ret);
lp->w_off += len;
if ((lp->stat.st_wc_bytes += len) >= MEGABYTE) {
lp->stat.st_wc_bytes -= MEGABYTE;
++lp->stat.st_wc_mbytes;
}
#ifdef HAVE_STATISTICS
if ((lp->stat.st_w_bytes += len) >= MEGABYTE) {
lp->stat.st_w_bytes -= MEGABYTE;
++lp->stat.st_w_mbytes;
}
++lp->stat.st_wcount;
#endif
return (0);
}
int
__log_file_pp(dbenv, lsn, namep, len)
DB_ENV *dbenv;
const DB_LSN *lsn;
char *namep;
size_t len;
{
DB_THREAD_INFO *ip;
ENV *env;
int ret, set;
env = dbenv->env;
ENV_REQUIRES_CONFIG(env,
env->lg_handle, "DB_ENV->log_file", DB_INIT_LOG);
if ((ret = __log_get_config(dbenv, DB_LOG_IN_MEMORY, &set)) != 0)
return (ret);
if (set) {
__db_errx(env, DB_STR("2518",
"DB_ENV->log_file is illegal with in-memory logs"));
return (EINVAL);
}
ENV_ENTER(env, ip);
REPLICATION_WRAP(env, (__log_file(env, lsn, namep, len)), 0, ret);
ENV_LEAVE(env, ip);
return (ret);
}
static int
__log_file(env, lsn, namep, len)
ENV *env;
const DB_LSN *lsn;
char *namep;
size_t len;
{
DB_LOG *dblp;
int ret;
char *name;
dblp = env->lg_handle;
LOG_SYSTEM_LOCK(env);
ret = __log_name(dblp, lsn->file, &name, NULL, 0);
LOG_SYSTEM_UNLOCK(env);
if (ret != 0)
return (ret);
if (len < strlen(name) + 1) {
*namep = '\0';
__db_errx(env, DB_STR("2519",
"DB_ENV->log_file: name buffer is too short"));
return (EINVAL);
}
(void)strcpy(namep, name);
__os_free(env, name);
return (0);
}
static int
__log_newfh(dblp, create)
DB_LOG *dblp;
int create;
{
ENV *env;
LOG *lp;
u_int32_t flags;
int ret;
logfile_validity status;
env = dblp->env;
lp = dblp->reginfo.primary;
if (dblp->lfhp != NULL) {
(void)__os_closehandle(env, dblp->lfhp);
dblp->lfhp = NULL;
}
flags = DB_OSO_SEQ |
(create ? DB_OSO_CREATE : 0) |
(F_ISSET(dblp, DBLOG_DIRECT) ? DB_OSO_DIRECT : 0) |
(F_ISSET(dblp, DBLOG_DSYNC) ? DB_OSO_DSYNC : 0);
dblp->lfname = lp->lsn.file;
if ((ret = __log_valid(dblp, dblp->lfname, 0, &dblp->lfhp,
flags, &status, NULL)) != 0)
__db_err(env, ret,
"DB_ENV->log_newfh: %lu", (u_long)lp->lsn.file);
else if (status != DB_LV_NORMAL && status != DB_LV_INCOMPLETE &&
status != DB_LV_OLD_READABLE)
ret = DB_NOTFOUND;
return (ret);
}
int
__log_name(dblp, filenumber, namep, fhpp, flags)
DB_LOG *dblp;
u_int32_t filenumber, flags;
char **namep;
DB_FH **fhpp;
{
ENV *env;
LOG *lp;
int mode, ret;
char *oname;
char old[sizeof(LFPREFIX) + 5 + 20], new[sizeof(LFPREFIX) + 10 + 20];
env = dblp->env;
lp = dblp->reginfo.primary;
DB_ASSERT(env, !lp->db_log_inmemory);
(void)snprintf(new, sizeof(new), LFNAME, filenumber);
if ((ret = __db_appname(env,
DB_APP_LOG, new, NULL, namep)) != 0 || fhpp == NULL)
return (ret);
if (lp->filemode == 0)
mode = env->db_mode;
else {
LF_SET(DB_OSO_ABSMODE);
mode = lp->filemode;
}
dblp->lf_timestamp = lp->timestamp;
if ((ret = __os_open(env, *namep, 0, flags, mode, fhpp)) == 0)
return (0);
if (ret != ENOENT) {
__db_err(env, ret, DB_STR_A("2520",
"%s: log file unreadable", "%s"), *namep);
return (__env_panic(env, ret));
}
if (!LF_ISSET(DB_OSO_RDONLY)) {
__db_err(env, ret, DB_STR_A("2521",
"%s: log file open failed", "%s"), *namep);
return (__env_panic(env, ret));
}
(void)snprintf(old, sizeof(old), LFNAME_V1, filenumber);
if ((ret = __db_appname(env,
DB_APP_LOG, old, NULL, &oname)) != 0)
goto err;
if ((ret = __os_open(env, oname, 0, flags, mode, fhpp)) == 0) {
__os_free(env, *namep);
*namep = oname;
return (0);
}
err: __os_free(env, oname);
return (ret);
}
int
__log_rep_put(env, lsnp, rec, flags)
ENV *env;
DB_LSN *lsnp;
const DBT *rec;
u_int32_t flags;
{
DBT *dbt, t;
DB_CIPHER *db_cipher;
DB_LOG *dblp;
HDR hdr;
LOG *lp;
int need_free, ret;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
LOG_SYSTEM_LOCK(env);
memset(&hdr, 0, sizeof(HDR));
t = *rec;
dbt = &t;
need_free = 0;
db_cipher = env->crypto_handle;
if (CRYPTO_ON(env))
t.size += db_cipher->adj_size(rec->size);
if ((ret = __os_calloc(env, 1, t.size, &t.data)) != 0)
goto err;
need_free = 1;
memcpy(t.data, rec->data, rec->size);
if ((ret = __log_encrypt_record(env, dbt, &hdr, rec->size)) != 0)
goto err;
DB_ASSERT(env, LOG_COMPARE(lsnp, &lp->lsn) == 0);
ret = __log_putr(dblp, lsnp, dbt, lp->lsn.offset - lp->len, &hdr);
err:
lp->ready_lsn = lp->lsn;
if (LF_ISSET(DB_LOG_CHKPNT))
lp->stat.st_wc_bytes = lp->stat.st_wc_mbytes = 0;
STAT(++lp->stat.st_record);
LOG_SYSTEM_UNLOCK(env);
if (need_free)
__os_free(env, t.data);
return (ret);
}
static int
__log_encrypt_record(env, dbt, hdr, orig)
ENV *env;
DBT *dbt;
HDR *hdr;
u_int32_t orig;
{
DB_CIPHER *db_cipher;
int ret;
if (CRYPTO_ON(env)) {
db_cipher = env->crypto_handle;
hdr->size = HDR_CRYPTO_SZ;
hdr->orig_size = orig;
if ((ret = db_cipher->encrypt(env, db_cipher->data,
hdr->iv, dbt->data, dbt->size)) != 0)
return (ret);
} else {
hdr->size = HDR_NORMAL_SZ;
}
return (0);
}
#ifdef STDC_HEADERS
int
__log_put_record_pp(DB_ENV *dbenv, DB *dbp, DB_TXN *txnp, DB_LSN *ret_lsnp,
u_int32_t flags, u_int32_t rectype, u_int32_t has_data, u_int32_t size,
DB_LOG_RECSPEC *spec, ...)
#else
int
__log_put_record_pp(dbenv, dbp, txnp, ret_lsnp,
flags, rectype, has_data, size,
spec, va_alist)
DB_ENV *dbenv;
DB *dbp;
DB_TXN *txnp;
DB_LSN *ret_lsnp;
u_int32_t flags;
u_int32_t rectype;
u_int32_t has_data;
u_int32_t size;
DB_LOG_RECSPEC *spec;
va_dcl
#endif
{
DB_THREAD_INFO *ip;
ENV *env;
va_list argp;
int ret;
env = dbenv->env;
ENV_REQUIRES_CONFIG(env,
env->lg_handle, "DB_ENV->log_put_record", DB_INIT_LOG);
if ((ret = __db_fchk(env, "DB_ENV->log_put_record", flags,
DB_LOG_CHKPNT | DB_LOG_COMMIT |
DB_FLUSH | DB_LOG_NOCOPY | DB_LOG_WRNOSYNC)) != 0)
return (ret);
if (LF_ISSET(DB_LOG_WRNOSYNC) && LF_ISSET(DB_FLUSH))
return (__db_ferr(env, "DB_ENV->log_put_record", 1));
if (IS_REP_CLIENT(env)) {
__db_errx(env, DB_STR("2522",
"DB_ENV->log_put is illegal on replication clients"));
return (EINVAL);
}
ENV_ENTER(env, ip);
va_start(argp, spec);
REPLICATION_WRAP(env, (__log_put_record_int(env, dbp,
txnp, ret_lsnp, flags, rectype, has_data, size, spec, argp)),
0, ret);
va_end(argp);
ENV_LEAVE(env, ip);
return (ret);
}
#ifdef STDC_HEADERS
int
__log_put_record(ENV *env, DB *dbp, DB_TXN *txnp, DB_LSN *ret_lsnp,
u_int32_t flags, u_int32_t rectype, u_int32_t has_data, u_int32_t size,
DB_LOG_RECSPEC *spec, ...)
#else
int
__log_put_record(env, dbp, txnp, ret_lsnp,
flags, rectype, has_data, size, spec, va_alist);
ENV *env;
DB *dbp;
DB_TXN *txnp;
DB_LSN *ret_lsnp;
u_int32_t flags;
u_int32_t rectype;
u_int32_t has_data;
u_int32_t size;
DB_LOG_RECSPEC *spec;
va_dcl
#endif
{
va_list argp;
int ret;
va_start(argp, spec);
ret = __log_put_record_int(env, dbp, txnp, ret_lsnp, flags,
rectype, has_data, size, spec, argp);
va_end(argp);
return (ret);
}
#ifdef STDC_HEADERS
static int
__log_put_record_int(ENV *env, DB *dbp, DB_TXN *txnp, DB_LSN *ret_lsnp,
u_int32_t flags, u_int32_t rectype, u_int32_t has_data, u_int32_t size,
DB_LOG_RECSPEC *spec, va_list argp)
#else
int
__log_put_record_int(env, dbp, txnp, ret_lsnp,
flags, rectype, has_data, size, spec, argp);
ENV *env;
DB *dbp;
DB_TXN *txnp;
DB_LSN *ret_lsnp;
u_int32_t flags;
u_int32_t has_data;
u_int32_t size;
u_int32_t rectype;
DB_LOG_RECSPEC *spec;
va_list argp;
#endif
{
DBT *data, *dbt, *header, logrec;
DB_LOG_RECSPEC *sp;
DB_LSN *lsnp, lsn, null_lsn, *pagelsn, *rlsnp;
DB_TXNLOGREC *lr;
LOG *lp;
PAGE *pghdrstart;
u_int32_t hdrsize, op, zero, uinttmp, txn_num;
u_int npad;
u_int8_t *bp;
int is_durable, ret;
void *hdrstart;
COMPQUIET(lr, NULL);
COMPQUIET(hdrsize, 0);
COMPQUIET(op, 0);
COMPQUIET(hdrstart, NULL);
COMPQUIET(pghdrstart, NULL);
COMPQUIET(header, NULL);
if (LF_ISSET(DB_LOG_COMMIT))
rlsnp = ret_lsnp;
else
rlsnp = &lsn;
npad = 0;
ret = 0;
data = NULL;
if (LF_ISSET(DB_LOG_NOT_DURABLE) ||
(dbp != NULL && F_ISSET(dbp, DB_AM_NOT_DURABLE))) {
if (txnp == NULL)
return (0);
is_durable = 0;
} else
is_durable = 1;
if (txnp == NULL) {
txn_num = 0;
lsnp = &null_lsn;
null_lsn.file = null_lsn.offset = 0;
} else {
if (TAILQ_FIRST(&txnp->kids) != NULL &&
(ret = __txn_activekids(env, rectype, txnp)) != 0)
return (ret);
DB_SET_TXN_LSNP(txnp, &rlsnp, &lsnp);
txn_num = txnp->txnid;
}
if (dbp != NULL) {
DB_ASSERT(env, dbp->log_filename != NULL);
if (dbp->log_filename->id == DB_LOGFILEID_INVALID &&
(ret = __dbreg_lazy_id(dbp)) != 0)
return (ret);
}
logrec.size = size;
if (CRYPTO_ON(env)) {
npad = env->crypto_handle->adj_size(logrec.size);
logrec.size += npad;
}
if (is_durable || txnp == NULL) {
if ((ret =
__os_malloc(env, logrec.size, &logrec.data)) != 0)
return (ret);
} else {
if ((ret = __os_malloc(env,
logrec.size + sizeof(DB_TXNLOGREC), &lr)) != 0)
return (ret);
#ifdef DIAGNOSTIC
if ((ret =
__os_malloc(env, logrec.size, &logrec.data)) != 0) {
__os_free(env, lr);
return (ret);
}
#else
logrec.data = lr->data;
#endif
}
if (npad > 0)
memset((u_int8_t *)logrec.data + logrec.size - npad, 0, npad);
bp = logrec.data;
LOGCOPY_32(env, bp, &rectype);
bp += sizeof(rectype);
LOGCOPY_32(env, bp, &txn_num);
bp += sizeof(txn_num);
LOGCOPY_FROMLSN(env, bp, lsnp);
bp += sizeof(DB_LSN);
zero = 0;
lp = env->lg_handle->reginfo.primary;
for (sp = spec; sp->type != LOGREC_Done; sp++) {
switch (sp->type) {
case LOGREC_DB:
uinttmp = (u_int32_t)dbp->log_filename->id;
LOGCOPY_32(env, bp, &uinttmp);
bp += sizeof(uinttmp);
break;
case LOGREC_ARG:
case LOGREC_TIME:
case LOGREC_DBOP:
uinttmp = va_arg(argp, u_int32_t);
LOGCOPY_32(env, bp, &uinttmp);
bp += sizeof(uinttmp);
break;
case LOGREC_OP:
op = va_arg(argp, u_int32_t);
LOGCOPY_32(env, bp, &op);
bp += sizeof(uinttmp);
break;
case LOGREC_DBT:
case LOGREC_PGLIST:
case LOGREC_LOCKS:
case LOGREC_HDR:
case LOGREC_DATA:
dbt = va_arg(argp, DBT *);
if (dbt == NULL) {
LOGCOPY_32(env, bp, &zero);
bp += sizeof(u_int32_t);
} else {
LOGCOPY_32(env, bp, &dbt->size);
bp += sizeof(dbt->size);
memcpy(bp, dbt->data, dbt->size);
}
if (dbp != NULL && F_ISSET(dbp, DB_AM_SWAP)) {
if (sp->type == LOGREC_HDR &&
dbt != NULL && has_data == 0)
__db_recordswap(op,
dbt->size, bp, NULL, 0);
else if (sp->type == LOGREC_HDR) {
hdrstart = bp;
hdrsize = dbt == NULL ? 0 : dbt->size;
} else if (sp->type == LOGREC_DATA) {
__db_recordswap(op,
hdrsize, hdrstart, bp, 0);
has_data = 0;
}
}
if (dbt != NULL)
bp += dbt->size;
break;
case LOGREC_PGDBT:
header = va_arg(argp, DBT *);
if (header == NULL) {
LOGCOPY_32(env, bp, &zero);
bp += sizeof(u_int32_t);
} else {
LOGCOPY_32(env, bp, &header->size);
bp += sizeof(header->size);
pghdrstart = (PAGE *)bp;
memcpy(bp, header->data, header->size);
if (has_data == 0 &&
F_ISSET(dbp, DB_AM_SWAP) &&
(ret = __db_pageswap(
env, dbp, pghdrstart, (size_t)header->size,
NULL, 0)) != 0)
return (ret);
bp += header->size;
}
break;
case LOGREC_PGDDBT:
data = va_arg(argp, DBT *);
if (data == NULL) {
zero = 0;
LOGCOPY_32(env, bp, &zero);
bp += sizeof(u_int32_t);
} else {
if (F_ISSET(dbp, DB_AM_SWAP) &&
(ret = __db_pageswap(env, dbp, pghdrstart,
(size_t)header->size, (DBT *)data, 0)) != 0)
return (ret);
LOGCOPY_32(env, bp, &data->size);
bp += sizeof(data->size);
memcpy(bp, data->data, data->size);
if (F_ISSET(dbp, DB_AM_SWAP) &&
F_ISSET(data, DB_DBT_APPMALLOC))
__os_free(env, data->data);
bp += data->size;
}
break;
case LOGREC_POINTER:
pagelsn = va_arg(argp, DB_LSN *);
if (pagelsn != NULL) {
if (txnp != NULL) {
if (LOG_COMPARE(pagelsn,
&lp->lsn) >= 0 && (ret =
__log_check_page_lsn(env,
dbp, pagelsn)) != 0)
return (ret);
}
LOGCOPY_FROMLSN(env, bp, pagelsn);
} else
memset(bp, 0, sizeof(*pagelsn));
bp += sizeof(*pagelsn);
break;
default:
DB_ASSERT(env, sp->type != sp->type);
}
}
DB_ASSERT(env,
(u_int32_t)(bp - (u_int8_t *)logrec.data) <= logrec.size);
if (is_durable || txnp == NULL) {
if ((ret = __log_put(env, rlsnp,(DBT *)&logrec,
flags | DB_LOG_NOCOPY)) == 0) {
if (txnp != NULL)
*lsnp = *rlsnp;
*ret_lsnp = *rlsnp;
}
} else {
ret = 0;
#ifdef DIAGNOSTIC
memcpy(lr->data, logrec.data, logrec.size);
rectype |= DB_debug_FLAG;
LOGCOPY_32(env, logrec.data, &rectype);
if (!IS_REP_CLIENT(env) && !lp->db_log_inmemory)
ret = __log_put(env,
rlsnp, (DBT *)&logrec, flags | DB_LOG_NOCOPY);
#endif
STAILQ_INSERT_HEAD(&txnp->logs, lr, links);
F_SET((TXN_DETAIL *)txnp->td, TXN_DTL_INMEMORY);
LSN_NOT_LOGGED(*ret_lsnp);
}
#ifdef LOG_DIAGNOSTIC
if (ret != 0)
(void)__db_addrem_print(env,
(DBT *)&logrec, ret_lsnp, DB_TXN_PRINT, NULL);
#endif
#ifdef DIAGNOSTIC
__os_free(env, logrec.data);
#else
if (is_durable || txnp == NULL)
__os_free(env, logrec.data);
#endif
return (ret);
}