use std::time::Duration;
use bson::{bson, doc, Document};
use crate::{
bson_util,
cmap::{CommandResponse, StreamDescription},
operation::{test, Find, Operation},
options::{CursorType, FindOptions, Hint, ReadConcern, StreamAddress},
Namespace,
};
fn build_test(
ns: Namespace,
filter: Option<Document>,
options: Option<FindOptions>,
mut expected_body: Document,
) {
let find = Find::new(ns.clone(), filter, options);
let mut cmd = find.build(&StreamDescription::new_testing()).unwrap();
assert_eq!(cmd.name.as_str(), "find");
assert_eq!(cmd.target_db.as_str(), ns.db.as_str());
bson_util::sort_document(&mut expected_body);
bson_util::sort_document(&mut cmd.body);
assert_eq!(cmd.body, expected_body);
}
#[test]
fn build() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let filter = doc! {
"x": 2,
"y": { "$gt": 1 },
};
let options = FindOptions::builder()
.hint(Hint::Keys(doc! { "x": 1, "y": 2 }))
.projection(doc! { "x": 0 })
.allow_partial_results(true)
.read_concern(ReadConcern::Available)
.build();
let expected_body = doc! {
"find": "test_coll",
"filter": filter.clone(),
"hint": {
"x": 1,
"y": 2,
},
"projection": {
"x": 0
},
"allowPartialResults": true,
"readConcern": {
"level": "available"
}
};
build_test(ns, Some(filter), Some(options), expected_body);
}
#[test]
fn build_cursor_type() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let non_tailable_options = FindOptions::builder()
.cursor_type(CursorType::NonTailable)
.build();
let non_tailable_body = doc! {
"find": "test_coll",
};
build_test(
ns.clone(),
None,
Some(non_tailable_options),
non_tailable_body,
);
let tailable_options = FindOptions::builder()
.cursor_type(CursorType::Tailable)
.build();
let tailable_body = doc! {
"find": "test_coll",
"tailable": true
};
build_test(ns.clone(), None, Some(tailable_options), tailable_body);
let tailable_await_options = FindOptions::builder()
.cursor_type(CursorType::TailableAwait)
.build();
let tailable_await_body = doc! {
"find": "test_coll",
"tailable": true,
"awaitData": true,
};
build_test(ns, None, Some(tailable_await_options), tailable_await_body);
}
#[test]
fn build_max_await_time() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let options = FindOptions::builder()
.max_await_time(Duration::from_millis(5))
.max_time(Duration::from_millis(10))
.build();
let body = doc! {
"find": "test_coll",
"maxTimeMS": 10 as i64
};
build_test(ns, None, Some(options), body);
}
#[test]
fn build_limit() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let positive_options = FindOptions::builder().limit(5).build();
let positive_body = doc! {
"find": "test_coll",
"limit": 5 as i64
};
build_test(ns.clone(), None, Some(positive_options), positive_body);
let negative_options = FindOptions::builder().limit(-5).build();
let negative_body = doc! {
"find": "test_coll",
"limit": 5 as i64,
"singleBatch": true
};
build_test(ns, None, Some(negative_options), negative_body);
}
#[test]
fn build_batch_size() {
let options = FindOptions::builder().batch_size(1).build();
let body = doc! {
"find": "",
"batchSize": 1
};
build_test(Namespace::empty(), None, Some(options), body);
let options = FindOptions::builder()
.batch_size((std::i32::MAX as u32) + 1)
.build();
let op = Find::new(Namespace::empty(), None, Some(options));
assert!(op.build(&StreamDescription::new_testing()).is_err())
}
#[test]
fn op_selection_criteria() {
test::op_selection_criteria(|selection_criteria| {
let options = FindOptions {
selection_criteria,
..Default::default()
};
Find::new(Namespace::empty(), None, Some(options))
});
}
#[test]
fn handle_success() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let address = StreamAddress {
hostname: "localhost".to_string(),
port: None,
};
let find = Find::empty();
let first_batch = vec![doc! {"_id": 1}, doc! {"_id": 2}];
let response = doc! {
"cursor": {
"id": 123,
"ns": format!("{}.{}", ns.db, ns.coll),
"firstBatch": bson_util::to_bson_array(&first_batch),
},
"ok": 1.0
};
let result = find.handle_response(CommandResponse::with_document_and_address(
address.clone(),
response.clone(),
));
assert!(result.is_ok());
let cursor_spec = result.unwrap();
assert_eq!(cursor_spec.address, address);
assert_eq!(cursor_spec.id, 123);
assert_eq!(cursor_spec.batch_size, None);
assert_eq!(
cursor_spec.buffer.into_iter().collect::<Vec<Document>>(),
first_batch
);
let find = Find::new(
ns,
None,
Some(FindOptions::builder().batch_size(123).build()),
);
let result = find.handle_response(CommandResponse::with_document_and_address(
address.clone(),
response,
));
assert!(result.is_ok());
let cursor_spec = result.unwrap();
assert_eq!(cursor_spec.address, address);
assert_eq!(cursor_spec.id, 123);
assert_eq!(cursor_spec.batch_size, Some(123));
assert_eq!(
cursor_spec.buffer.into_iter().collect::<Vec<Document>>(),
first_batch
);
}
fn verify_max_await_time(max_await_time: Option<Duration>, cursor_type: Option<CursorType>) {
let ns = Namespace::empty();
let address = StreamAddress {
hostname: "localhost".to_string(),
port: None,
};
let find = Find::new(
ns,
None,
Some(FindOptions {
cursor_type,
max_await_time,
..Default::default()
}),
);
let response = CommandResponse::with_document_and_address(
address,
doc! {
"cursor": {
"id": 123,
"ns": "a.b",
"firstBatch": [],
},
"ok": 1
},
);
let spec = find
.handle_response(response)
.expect("should handle correctly");
assert_eq!(spec.max_time, max_await_time);
}
#[test]
fn handle_max_await_time() {
verify_max_await_time(None, None);
verify_max_await_time(Some(Duration::from_millis(5)), None);
verify_max_await_time(
Some(Duration::from_millis(5)),
Some(CursorType::NonTailable),
);
verify_max_await_time(Some(Duration::from_millis(5)), Some(CursorType::Tailable));
verify_max_await_time(
Some(Duration::from_millis(5)),
Some(CursorType::TailableAwait),
);
}
#[test]
fn handle_invalid_response() {
let find = Find::empty();
let garbled = doc! { "asdfasf": "ASdfasdf" };
assert!(find
.handle_response(CommandResponse::with_document(garbled))
.is_err());
let missing_cursor_field = doc! {
"cursor": {
"ns": "test.test",
"firstBatch": [],
}
};
assert!(find
.handle_response(CommandResponse::with_document(missing_cursor_field))
.is_err());
}