extern crate futures;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate stitch;
extern crate tokio_core;
use std::vec::Vec;
use std::thread;
use futures::sync::mpsc::channel;
use stitch::{Message, StitchClient};
use tokio_core::reactor::Core;
const STITCH_AUTH_FIXTURE: &'static str = env!("STITCH_AUTH_FIXTURE");
const STITCH_CLIENT_ID: &'static str = env!("STITCH_CLIENT_ID");
#[derive(Debug, Clone, Serialize)]
struct Inner {
description: String,
}
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
id: u32,
name: String,
inner: Inner,
}
impl Message for TestRecord {
fn get_table_name(&self) -> String {
String::from("test_integration")
}
fn get_keys(&self) -> Vec<String> {
vec![String::from("id"), String::from("name")]
}
}
#[test]
pub fn test_buffered_stream_single_thread() {
let mut core = Core::new().unwrap();
let client_id = STITCH_CLIENT_ID.parse().unwrap();
let client = StitchClient::new(&core.handle(), client_id, STITCH_AUTH_FIXTURE).unwrap();
let r1 = TestRecord {
id: 1,
name: String::from("name_1"),
inner: Inner {
description: String::from("description_1"),
},
};
let r2 = TestRecord {
id: 2,
name: String::from("name_2"),
inner: Inner {
description: String::from("description_2"),
},
};
let r3 = TestRecord {
id: 3,
name: String::from("name_3"),
inner: Inner {
description: String::from("description_3"),
},
};
let r4 = TestRecord {
id: 4,
name: String::from("name_4"),
inner: Inner {
description: String::from("description_4"),
},
};
let r5 = TestRecord {
id: 5,
name: String::from("name_5"),
inner: Inner {
description: String::from("description_5"),
},
};
let r6 = TestRecord {
id: 6,
name: String::from("name_6"),
inner: Inner {
description: String::from("description_6"),
},
};
let r7 = TestRecord {
id: 7,
name: String::from("name_7"),
inner: Inner {
description: String::from("description_7"),
},
};
let f = client.validate_batch(vec![r1.clone(), r2.clone()]);
assert!(core.run(f).is_ok());
let (mut tx, mut rx) = channel(10);
assert!(tx.try_send(r1).is_ok());
assert!(tx.try_send(r2).is_ok());
assert!(tx.try_send(r3).is_ok());
assert!(tx.try_send(r4).is_ok());
assert!(tx.try_send(r5).is_ok());
assert!(tx.try_send(r6).is_ok());
assert!(tx.try_send(r7).is_ok());
rx.close();
let t = thread::spawn(move || {
let mut t_core = Core::new().unwrap();
let t_client = StitchClient::new(&t_core.handle(), client_id, STITCH_AUTH_FIXTURE).unwrap();
assert!(t_core.run(t_client.buffer_channel(3, rx)).is_ok());
});
assert!(t.join().is_ok());
}
#[test]
pub fn test_buffered_stream_multithreaded() {
let mut core = Core::new().unwrap();
let client_id = STITCH_CLIENT_ID.parse().unwrap();
let client = StitchClient::new(&core.handle(), client_id, STITCH_AUTH_FIXTURE).unwrap();
let r1 = TestRecord {
id: 1,
name: String::from("name_1"),
inner: Inner {
description: String::from("description_1"),
},
};
let r2 = TestRecord {
id: 2,
name: String::from("name_2"),
inner: Inner {
description: String::from("description_2"),
},
};
let r3 = TestRecord {
id: 3,
name: String::from("name_3"),
inner: Inner {
description: String::from("description_3"),
},
};
let r4 = TestRecord {
id: 4,
name: String::from("name_4"),
inner: Inner {
description: String::from("description_4"),
},
};
let r5 = TestRecord {
id: 5,
name: String::from("name_5"),
inner: Inner {
description: String::from("description_5"),
},
};
let r6 = TestRecord {
id: 6,
name: String::from("name_6"),
inner: Inner {
description: String::from("description_6"),
},
};
let r7 = TestRecord {
id: 7,
name: String::from("name_7"),
inner: Inner {
description: String::from("description_7"),
},
};
let f = client.validate_batch(vec![r1.clone(), r2.clone()]);
assert!(core.run(f).is_ok());
let (mut tx, mut rx) = channel(10);
assert!(tx.try_send(r1).is_ok());
assert!(tx.try_send(r2).is_ok());
assert!(tx.try_send(r3).is_ok());
assert!(tx.try_send(r4).is_ok());
assert!(tx.try_send(r5).is_ok());
assert!(tx.try_send(r6).is_ok());
assert!(tx.try_send(r7).is_ok());
rx.close();
assert!(core.run(client.buffer_channel(3, rx)).is_ok());
}