use std::collections::VecDeque;
use bson::{doc, Document, Timestamp};
use mongodb::sync::Client as MongoClient;
use tracing::info;
use crate::{Result, SyncError, COMMAND_OP, OP_KEY, TIMESTAMP_KEY};
use super::oplog_bulk::execute_normal_oplogs;
use crate::cmd_oplog::CmdOplog;
const BATCH_SIZE: usize = 10000;
fn apply_command_log(cmd_log: Document, mongo_conn: &MongoClient) -> Result<()> {
let cmd_oplog = CmdOplog::from_oplog_doc(&cmd_log)?;
if let Some(l) = cmd_oplog {
info!(?l, "begin to apply command oplog...");
l.apply(mongo_conn)?;
}
Ok(())
}
fn _execute_apply_ops_cmd(oplogs: &mut Vec<Document>, mongo_conn: &MongoClient) -> Result<()> {
let mut oplogs_to_apply = Vec::with_capacity(oplogs.len());
oplogs_to_apply.append(oplogs);
let _exec_result = mongo_conn
.database("admin")
.run_command(doc! {"applyOps": oplogs_to_apply}, None)
.map_err(SyncError::from)?;
Ok(())
}
#[derive(Debug)]
pub struct IncrDumper {
oplog_batch: VecDeque<Document>,
mongo_conn: MongoClient,
}
impl IncrDumper {
pub fn new(mongo_conn: MongoClient) -> Self {
IncrDumper {
oplog_batch: VecDeque::with_capacity(BATCH_SIZE),
mongo_conn,
}
}
pub fn push_oplogs(&mut self, oplogs: Vec<Document>) {
self.oplog_batch.extend(oplogs);
}
pub fn apply_oplogs(&mut self) -> Result<(bool, Timestamp)> {
let mut latest_ts = Timestamp {
increment: 0,
time: 0,
};
let mut normal_oplogs = vec![];
while !self.oplog_batch.is_empty() {
let one_log = self.oplog_batch.pop_front().unwrap();
latest_ts = one_log.get_timestamp(TIMESTAMP_KEY)?;
match one_log.get_str(OP_KEY)? {
COMMAND_OP => {
if !normal_oplogs.is_empty() {
info!(?latest_ts, "Begin to apply oplogs... ");
execute_normal_oplogs(&mut normal_oplogs, &self.mongo_conn)?;
info!(?latest_ts, "Apply oplogs complete... ");
}
apply_command_log(one_log, &self.mongo_conn)?;
return Ok((!self.oplog_batch.is_empty(), latest_ts));
}
_ => {
normal_oplogs.push(one_log);
if let Some(next_oplog) = self.oplog_batch.front() {
if next_oplog.get_str(OP_KEY)? == COMMAND_OP {
break;
}
}
}
}
}
if !normal_oplogs.is_empty() {
info!(?latest_ts, "Begin to apply oplogs... ");
execute_normal_oplogs(&mut normal_oplogs, &self.mongo_conn)?;
info!(?latest_ts, "Apply oplogs complete... ");
}
Ok((!self.oplog_batch.is_empty(), latest_ts))
}
}