extern crate failure;
extern crate faktory;
extern crate mockstream;
extern crate serde_json;
extern crate url;
mod mock;
use faktory::*;
use std::io;
use std::thread;
use std::time::Duration;
#[test]
fn hello() {
let mut s = mock::Stream::default();
let mut c = ConsumerBuilder::default();
c.hostname("host".to_string())
.wid("wid".to_string())
.labels(vec!["foo".to_string(), "bar".to_string()]);
c.register("never_called", |_| -> io::Result<()> { unreachable!() });
let c = c.connect_with(s.clone(), None).unwrap();
let written = s.pop_bytes_written(0);
assert!(written.starts_with(b"HELLO {"));
let written: serde_json::Value = serde_json::from_slice(&written[b"HELLO ".len()..]).unwrap();
let written = written.as_object().unwrap();
assert_eq!(
written.get("hostname").and_then(|h| h.as_str()),
Some("host")
);
assert_eq!(written.get("wid").and_then(|h| h.as_str()), Some("wid"));
assert_eq!(written.get("pid").map(|h| h.is_number()), Some(true));
assert_eq!(written.get("v").and_then(|h| h.as_i64()), Some(2));
let labels = written["labels"].as_array().unwrap();
assert_eq!(labels, &["foo", "bar"]);
drop(c);
let written = s.pop_bytes_written(0);
assert_eq!(written, b"END\r\n");
}
#[test]
fn hello_pwd() {
let mut s = mock::Stream::with_salt(1545, "55104dc76695721d");
let mut c = ConsumerBuilder::default();
c.register("never_called", |_| -> io::Result<()> { unreachable!() });
let c = c
.connect_with(s.clone(), Some("foobar".to_string()))
.unwrap();
let written = s.pop_bytes_written(0);
assert!(written.starts_with(b"HELLO {"));
let written: serde_json::Value = serde_json::from_slice(&written[b"HELLO ".len()..]).unwrap();
let written = written.as_object().unwrap();
assert_eq!(
written.get("pwdhash").and_then(|h| h.as_str()),
Some("6d877f8e5544b1f2598768f817413ab8a357afffa924dedae99eb91472d4ec30")
);
drop(c);
}
#[test]
fn dequeue() {
let mut s = mock::Stream::default();
let mut c = ConsumerBuilder::default();
c.register("foobar", |job: Job| -> io::Result<()> {
assert_eq!(job.args(), &["z"]);
Ok(())
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
s.push_bytes_to_read(
0,
b"$188\r\n\
{\
\"jid\":\"foojid\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[\"z\"],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
s.ok(0); if let Err(e) = c.run_one(0, &["default"]) {
println!("{:?}", e);
unreachable!();
}
let written = s.pop_bytes_written(0);
assert_eq!(
written,
&b"FETCH default\r\n\
ACK {\"jid\":\"foojid\"}\r\n"[..]
);
}
#[test]
fn dequeue_first_empty() {
let mut s = mock::Stream::default();
let mut c = ConsumerBuilder::default();
c.register("foobar", |job: Job| -> io::Result<()> {
assert_eq!(job.args(), &["z"]);
Ok(())
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
s.push_bytes_to_read(
0,
b"$0\r\n\r\n$188\r\n\
{\
\"jid\":\"foojid\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[\"z\"],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
s.ok(0);
match c.run_one(0, &["default"]) {
Ok(did_work) => assert!(!did_work),
Err(e) => {
println!("{:?}", e);
unreachable!();
}
}
match c.run_one(0, &["default"]) {
Ok(did_work) => assert!(did_work),
Err(e) => {
println!("{:?}", e);
unreachable!();
}
}
let written = s.pop_bytes_written(0);
assert_eq!(
written,
&b"\
FETCH default\r\n\
FETCH default\r\n\
ACK {\"jid\":\"foojid\"}\r\n\
"[..]
);
}
#[test]
fn well_behaved() {
let mut s = mock::Stream::new(2); let mut c = ConsumerBuilder::default();
c.wid("wid".to_string());
c.register("foobar", |_| -> io::Result<()> {
thread::sleep(Duration::from_secs(7));
Ok(())
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
s.push_bytes_to_read(
1,
b"$182\r\n\
{\
\"jid\":\"jid\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
let jh = thread::spawn(move || c.run(&["default"]));
s.push_bytes_to_read(0, b"+{\"state\":\"quiet\"}\r\n");
s.ok(1);
s.push_bytes_to_read(0, b"+{\"state\":\"terminate\"}\r\n");
assert_eq!(jh.join().unwrap().unwrap(), 0);
let written = s.pop_bytes_written(0);
let msgs = "\
BEAT {\"wid\":\"wid\"}\r\n\
BEAT {\"wid\":\"wid\"}\r\n\
END\r\n";
assert_eq!(std::str::from_utf8(&written[..]).unwrap(), msgs);
let written = s.pop_bytes_written(1);
let msgs = "\r\n\
FETCH default\r\n\
ACK {\"jid\":\"jid\"}\r\n\
END\r\n";
assert_eq!(
std::str::from_utf8(&written[(written.len() - msgs.len())..]).unwrap(),
msgs
);
}
#[test]
fn no_first_job() {
let mut s = mock::Stream::new(2);
let mut c = ConsumerBuilder::default();
c.wid("wid".to_string());
c.register("foobar", |_| -> io::Result<()> {
thread::sleep(Duration::from_secs(7));
Ok(())
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
s.push_bytes_to_read(
1,
b"$0\r\n\r\n$182\r\n\
{\
\"jid\":\"jid\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
let jh = thread::spawn(move || c.run(&["default"]));
s.push_bytes_to_read(0, b"+{\"state\":\"quiet\"}\r\n");
s.ok(1);
s.push_bytes_to_read(0, b"+{\"state\":\"terminate\"}\r\n");
assert_eq!(jh.join().unwrap().unwrap(), 0);
let written = s.pop_bytes_written(0);
let msgs = "\
BEAT {\"wid\":\"wid\"}\r\n\
BEAT {\"wid\":\"wid\"}\r\n\
END\r\n";
assert_eq!(std::str::from_utf8(&written[..]).unwrap(), msgs);
let written = s.pop_bytes_written(1);
let msgs = "\r\n\
FETCH default\r\n\
FETCH default\r\n\
ACK {\"jid\":\"jid\"}\r\n\
END\r\n";
assert_eq!(
std::str::from_utf8(&written[(written.len() - msgs.len())..]).unwrap(),
msgs
);
}
#[test]
fn well_behaved_many() {
let mut s = mock::Stream::new(3);
let mut c = ConsumerBuilder::default();
c.workers(2);
c.wid("wid".to_string());
c.register("foobar", |_| -> io::Result<()> {
thread::sleep(Duration::from_secs(7));
Ok(())
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
for i in 0..2 {
s.push_bytes_to_read(
i + 1,
b"$182\r\n\
{\
\"jid\":\"jid\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
}
let jh = thread::spawn(move || c.run(&["default"]));
s.push_bytes_to_read(0, b"+{\"state\":\"quiet\"}\r\n");
s.ok(1);
s.ok(2);
s.push_bytes_to_read(0, b"+{\"state\":\"terminate\"}\r\n");
assert_eq!(jh.join().unwrap().unwrap(), 0);
let written = s.pop_bytes_written(0);
let msgs = "\
BEAT {\"wid\":\"wid\"}\r\n\
BEAT {\"wid\":\"wid\"}\r\n\
END\r\n";
assert_eq!(std::str::from_utf8(&written[..]).unwrap(), msgs);
for i in 0..2 {
let written = s.pop_bytes_written(i + 1);
let msgs = "\r\n\
FETCH default\r\n\
ACK {\"jid\":\"jid\"}\r\n\
END\r\n";
assert_eq!(
std::str::from_utf8(&written[(written.len() - msgs.len())..]).unwrap(),
msgs
);
}
}
#[test]
fn terminate() {
let mut s = mock::Stream::new(2);
let mut c = ConsumerBuilder::default();
c.wid("wid".to_string());
c.register("foobar", |_| -> io::Result<()> {
loop {
thread::sleep(Duration::from_secs(5));
}
});
let mut c = c.connect_with(s.clone(), None).unwrap();
s.ignore(0);
s.push_bytes_to_read(
1,
b"$186\r\n\
{\
\"jid\":\"forever\",\
\"queue\":\"default\",\
\"jobtype\":\"foobar\",\
\"args\":[],\
\"created_at\":\"2017-11-01T21:02:35.772981326Z\",\
\"enqueued_at\":\"2017-11-01T21:02:35.773318394Z\",\
\"reserve_for\":600,\
\"retry\":25\
}\r\n",
);
let jh = thread::spawn(move || c.run(&["default"]));
s.push_bytes_to_read(0, b"+{\"state\":\"terminate\"}\r\n");
assert_eq!(jh.join().unwrap().unwrap(), 1);
let written = s.pop_bytes_written(0);
let beat = b"BEAT {\"wid\":\"wid\"}\r\nFAIL ";
assert_eq!(&written[0..beat.len()], &beat[..]);
assert!(written.ends_with(b"\r\nEND\r\n"));
println!(
"{}",
std::str::from_utf8(&written[beat.len()..(written.len() - b"\r\nEND\r\n".len())]).unwrap()
);
let written: serde_json::Value =
serde_json::from_slice(&written[beat.len()..(written.len() - b"\r\nEND\r\n".len())])
.unwrap();
assert_eq!(
written
.as_object()
.and_then(|o| o.get("jid"))
.and_then(|v| v.as_str()),
Some("forever")
);
let written = s.pop_bytes_written(1);
let msgs = "\r\n\
FETCH default\r\n";
assert_eq!(
std::str::from_utf8(&written[(written.len() - msgs.len())..]).unwrap(),
msgs
);
}