use std::time::Duration;
use bson::{bson, doc, Document};
use super::AggregateTarget;
use crate::{
bson_util,
cmap::{CommandResponse, StreamDescription},
concern::ReadConcern,
error::{ErrorKind, WriteFailure},
operation::{test, Aggregate, Operation},
options::{AggregateOptions, Hint, StreamAddress},
Namespace,
};
fn build_test(
target: impl Into<AggregateTarget>,
pipeline: Vec<Document>,
options: Option<AggregateOptions>,
mut expected_body: Document,
) {
let target = target.into();
let aggregate = Aggregate::new(target.clone(), pipeline, options);
let mut cmd = aggregate.build(&StreamDescription::new_testing()).unwrap();
assert_eq!(cmd.name.as_str(), "aggregate");
assert_eq!(cmd.target_db.as_str(), target.db_name());
bson_util::sort_document(&mut expected_body);
bson_util::sort_document(&mut cmd.body);
assert_eq!(cmd.body, expected_body);
}
#[test]
fn build() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let pipeline = vec![doc! { "$match": { "x": 3 }}];
let options = AggregateOptions::builder()
.hint(Hint::Keys(doc! { "x": 1, "y": 2 }))
.bypass_document_validation(true)
.read_concern(ReadConcern::Available)
.build();
let expected_body = doc! {
"aggregate": "test_coll",
"pipeline": bson_util::to_bson_array(&pipeline),
"cursor": {},
"hint": {
"x": 1,
"y": 2,
},
"bypassDocumentValidation": true,
"readConcern": {
"level": "available"
},
};
build_test(ns, pipeline, Some(options), expected_body);
}
#[test]
fn build_batch_size() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let pipeline = Vec::new();
let mut expected_body = doc! {
"aggregate": "test_coll",
"pipeline": [],
"cursor": {},
};
build_test(ns.clone(), pipeline.clone(), None, expected_body.clone());
build_test(
ns.clone(),
pipeline.clone(),
Some(AggregateOptions::default()),
expected_body.clone(),
);
let batch_size_options = AggregateOptions::builder().batch_size(5).build();
expected_body.insert("cursor", doc! { "batchSize": 5 });
build_test(
ns.clone(),
pipeline,
Some(batch_size_options.clone()),
expected_body.clone(),
);
let out_pipeline = vec![doc! { "$out": "cat" }];
expected_body.insert("cursor", Document::new());
expected_body.insert("pipeline", bson_util::to_bson_array(&out_pipeline));
build_test(
ns.clone(),
out_pipeline,
Some(batch_size_options.clone()),
expected_body.clone(),
);
let merge_pipeline = vec![doc! {
"$merge": {
"into": "out",
}
}];
expected_body.insert("pipeline", bson_util::to_bson_array(&merge_pipeline));
build_test(ns, merge_pipeline, Some(batch_size_options), expected_body);
}
#[test]
fn build_target() {
let pipeline = Vec::new();
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let expected_body = doc! {
"aggregate": "test_coll",
"pipeline": [],
"cursor": {},
};
build_test(ns.clone(), pipeline.clone(), None, expected_body);
let expected_body = doc! {
"aggregate": 1,
"pipeline": [],
"cursor": {}
};
build_test(ns.db, pipeline, None, expected_body);
}
#[test]
fn build_max_await_time() {
let options = AggregateOptions::builder()
.max_await_time(Duration::from_millis(5))
.max_time(Duration::from_millis(10))
.build();
let body = doc! {
"aggregate": 1,
"cursor": {},
"maxTimeMS": 10 as i64,
"pipeline": []
};
build_test("".to_string(), Vec::new(), Some(options), body);
}
#[test]
fn op_selection_criteria() {
test::op_selection_criteria(|selection_criteria| {
let options = AggregateOptions {
selection_criteria,
..Default::default()
};
Aggregate::new("".to_string(), Vec::new(), Some(options))
});
}
#[test]
fn handle_success() {
let ns = Namespace {
db: "test_db".to_string(),
coll: "test_coll".to_string(),
};
let address = StreamAddress {
hostname: "localhost".to_string(),
port: None,
};
let aggregate = Aggregate::new(ns.clone(), Vec::new(), None);
let first_batch = vec![doc! {"_id": 1}, doc! {"_id": 2}];
let response = doc! {
"cursor": {
"id": 123,
"ns": format!("{}.{}", ns.db, ns.coll),
"firstBatch": bson_util::to_bson_array(&first_batch),
},
"ok": 1.0
};
let result = aggregate.handle_response(CommandResponse::with_document_and_address(
address.clone(),
response.clone(),
));
assert!(result.is_ok());
let cursor_spec = result.unwrap();
assert_eq!(cursor_spec.address, address);
assert_eq!(cursor_spec.id, 123);
assert_eq!(cursor_spec.batch_size, None);
assert_eq!(
cursor_spec.buffer.into_iter().collect::<Vec<Document>>(),
first_batch
);
let aggregate = Aggregate::new(
ns,
Vec::new(),
Some(
AggregateOptions::builder()
.batch_size(123)
.max_await_time(Duration::from_millis(5))
.build(),
),
);
let result = aggregate.handle_response(CommandResponse::with_document_and_address(
address.clone(),
response,
));
assert!(result.is_ok());
let cursor_spec = result.unwrap();
assert_eq!(cursor_spec.address, address);
assert_eq!(cursor_spec.id, 123);
assert_eq!(cursor_spec.batch_size, Some(123));
assert_eq!(cursor_spec.max_time, Some(Duration::from_millis(5)));
assert_eq!(
cursor_spec.buffer.into_iter().collect::<Vec<Document>>(),
first_batch
);
}
#[test]
fn handle_max_await_time() {
let response = CommandResponse::with_document_and_address(
StreamAddress::default(),
doc! {
"cursor": {
"id": 123,
"ns": "a.b",
"firstBatch": []
},
"ok": 1.0
},
);
let aggregate = Aggregate::empty();
let spec = aggregate
.handle_response(response.clone())
.expect("handle should succeed");
assert!(spec.max_time.is_none());
let max_await = Duration::from_millis(123);
let options = AggregateOptions::builder()
.max_await_time(max_await)
.build();
let aggregate = Aggregate::new(Namespace::empty(), Vec::new(), Some(options));
let spec = aggregate
.handle_response(response)
.expect("handle_should_succeed");
assert_eq!(spec.max_time, Some(max_await));
}
#[test]
fn handle_write_concern_error() {
let response = CommandResponse::with_document(doc! {
"cursor" : {
"firstBatch" : [ ],
"id" : 0 as i64,
"ns" : "test.test"
},
"writeConcernError" : {
"code" : 64,
"codeName" : "WriteConcernFailed",
"errmsg" : "waiting for replication timed out",
"errInfo" : {
"wtimeout" : true
}
},
"ok" : 1,
});
let aggregate = Aggregate::new(
Namespace::empty(),
vec![doc! { "$merge": { "into": "a" } }],
None,
);
let error = aggregate
.handle_response(response)
.expect_err("should get wc error");
match *error.kind {
ErrorKind::WriteError(WriteFailure::WriteConcernError(_)) => {}
ref e => panic!("should have gotten WriteConcernError, got {:?} instead", e),
}
}
#[test]
fn handle_invalid_response() {
let aggregate = Aggregate::empty();
let garbled = doc! { "asdfasf": "ASdfasdf" };
assert!(aggregate
.handle_response(CommandResponse::with_document(garbled))
.is_err());
let missing_cursor_field = doc! {
"cursor": {
"ns": "test.test",
"firstBatch": [],
}
};
assert!(aggregate
.handle_response(CommandResponse::with_document(missing_cursor_field))
.is_err());
}