use crate::{Result, SyncError, COMMAND_OP, NAMESPACE_KEY, OP_KEY, TIMESTAMP_KEY};
use bson::Document;
use bson::{doc, Timestamp};
use mongodb::options::{FindOneOptions, FindOptions};
use mongodb::sync::Collection;
use std::collections::HashSet;
enum Natural {
Earliest,
Latest,
}
pub fn get_earliest_ts(coll: &Collection<Document>) -> Result<Timestamp> {
get_one_oplog_ts(coll, Natural::Earliest)
}
pub fn get_latest_ts(coll: &Collection<Document>) -> Result<Timestamp> {
get_one_oplog_ts(coll, Natural::Latest)
}
fn get_one_oplog_ts(coll: &Collection<Document>, natural: Natural) -> Result<Timestamp> {
let sorted_doc = match natural {
Natural::Earliest => doc! {"$natural": 1},
Natural::Latest => doc! {"$natural": -1},
};
coll.find_one(None, FindOneOptions::builder().sort(sorted_doc).build())?
.map(|d| d.get_timestamp(TIMESTAMP_KEY).map_err(SyncError::from))
.unwrap_or_else(|| Err(SyncError::EmptyDocError))
}
pub fn get_earliest_ts_no_capped(coll: &Collection<Document>) -> Result<Timestamp> {
get_one_oplog_ts_no_capped(coll, Natural::Earliest)
}
pub fn get_latest_ts_no_capped(coll: &Collection<Document>) -> Result<Timestamp> {
get_one_oplog_ts_no_capped(coll, Natural::Latest)
}
fn get_one_oplog_ts_no_capped(coll: &Collection<Document>, natural: Natural) -> Result<Timestamp> {
let sorted_doc = match natural {
Natural::Earliest => doc! {TIMESTAMP_KEY: 1},
Natural::Latest => doc! {TIMESTAMP_KEY: -1},
};
coll.find_one(None, FindOneOptions::builder().sort(sorted_doc).build())?
.map(|d| d.get_timestamp(TIMESTAMP_KEY).map_err(SyncError::from))
.unwrap_or_else(|| Err(SyncError::EmptyDocError))
}
pub fn get_next_batch(
oplog_coll: &Collection<Document>,
start_point: Timestamp,
end_point: Option<Timestamp>,
size: usize,
) -> Result<Vec<Document>> {
let mut filter = doc! {"ts": {"$gt": start_point}};
if let Some(end_ts) = end_point {
filter
.get_document_mut("ts")
.unwrap()
.insert("$lte", end_ts);
}
let mut result = vec![];
for doc in oplog_coll.find(
filter,
FindOptions::builder()
.sort(doc! {TIMESTAMP_KEY: 1})
.limit(size as i64)
.build(),
)? {
let d = doc?;
result.push(d);
}
Ok(result)
}
pub fn filter_oplogs(
oplogs: Vec<Document>,
db_name: &str,
valid_colls: &Option<HashSet<String>>,
) -> Vec<Document> {
oplogs
.into_iter()
.filter(|one_log| {
let (log_db_name, log_coll_name) = one_log
.get_str(NAMESPACE_KEY)
.expect("oplog should contains `ns` key")
.split_once(".")
.expect("`ns` value should be split by '.'");
if log_db_name == db_name {
match valid_colls {
Some(valid_colls) => {
(one_log
.get_str(OP_KEY)
.expect("oplog should contains `op` field")
== COMMAND_OP)
|| valid_colls.contains(log_coll_name)
}
None => true,
}
} else {
false
}
})
.collect()
}