#![cfg(target_os = "linux")]
use bytes::Bytes;
use futures::channel;
use nix::{
sys::signal::{Signal, kill},
unistd::{ForkResult, fork},
};
use std::{thread, time::Duration};
use systemstat::{Platform, System};
use tentacle::{
ProtocolId, async_trait,
builder::{MetaBuilder, ServiceBuilder},
context::{ProtocolContext, ProtocolContextMutRef},
multiaddr::Multiaddr,
secio::SecioKeyPair,
service::{
ProtocolHandle, ProtocolMeta, Service, ServiceControl, TargetProtocol, TargetSession,
},
traits::{ServiceHandle, ServiceProtocol},
};
fn current_used_memory() -> Option<f64> {
let sys = System::new();
match sys.memory() {
Ok(mem) => Some((mem.total.as_u64() - mem.free.as_u64()) as f64),
Err(_) => None,
}
}
fn current_used_cpu() -> Option<f32> {
let sys = System::new();
match sys.cpu_load_aggregate() {
Ok(cpu) => {
thread::sleep(Duration::from_secs(1));
cpu.done().ok().map(|cpu| cpu.user)
}
Err(_) => None,
}
}
pub fn create<F>(secio: bool, meta: ProtocolMeta, shandle: F) -> Service<F, SecioKeyPair>
where
F: ServiceHandle + Unpin + 'static,
{
let builder = ServiceBuilder::default()
.insert_protocol(meta)
.forever(true);
if secio {
builder
.handshake_type(SecioKeyPair::secp256k1_generated().into())
.build(shandle)
} else {
builder.build(shandle)
}
}
struct PHandle {
connected_count: usize,
sender: crossbeam_channel::Sender<()>,
}
#[async_trait]
impl ServiceProtocol for PHandle {
async fn init(&mut self, _context: &mut ProtocolContext) {}
async fn connected(&mut self, _context: ProtocolContextMutRef<'_>, _version: &str) {
self.connected_count += 1;
assert_eq!(self.sender.send(()), Ok(()));
}
async fn disconnected(&mut self, _context: ProtocolContextMutRef<'_>) {
self.connected_count -= 1;
assert_eq!(self.sender.send(()), Ok(()));
}
async fn received(&mut self, context: ProtocolContextMutRef<'_>, data: bytes::Bytes) {
let _res = context
.filter_broadcast(TargetSession::All, context.proto_id, data)
.await;
}
}
fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<()>) {
let (sender, receiver) = crossbeam_channel::bounded(1);
let meta = MetaBuilder::new()
.id(id)
.service_handle(move || {
if id == 0.into() {
ProtocolHandle::None
} else {
let handle = Box::new(PHandle {
connected_count: 0,
sender,
});
ProtocolHandle::Callback(handle)
}
})
.build();
(meta, receiver)
}
fn test_kill(secio: bool) {
let (meta, receiver) = create_meta(1.into());
let (addr_sender, addr_receiver) = channel::oneshot::channel::<Multiaddr>();
let mut service = create(secio, meta, ());
let control: ServiceControl = service.control().clone().into();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.await
.unwrap();
let _res = addr_sender.send(listen_addr);
service.run().await
});
});
thread::sleep(Duration::from_millis(100));
match unsafe { fork() } {
Err(e) => panic!("Fork failed, {}", e),
Ok(ForkResult::Parent { child }) => {
assert_eq!(receiver.recv(), Ok(()));
let _res =
control.filter_broadcast(TargetSession::All, 1.into(), Bytes::from("hello world"));
let mem_start = current_used_memory().unwrap();
let cpu_start = current_used_cpu().unwrap();
thread::sleep(Duration::from_secs(10));
assert_eq!(kill(child, Signal::SIGKILL), Ok(()));
assert_eq!(receiver.recv(), Ok(()));
let mem_stop = current_used_memory().unwrap();
let cpu_stop = current_used_cpu().unwrap();
assert!((mem_stop - mem_start) / mem_start < 0.1);
assert!((cpu_stop - cpu_start) / cpu_start < 0.1);
}
Ok(ForkResult::Child) => {
let rt = tokio::runtime::Runtime::new().unwrap();
let (meta, _receiver) = create_meta(1.into());
let mut service = create(secio, meta, ());
rt.block_on(async move {
let listen_addr = addr_receiver.await.unwrap();
service
.dial(listen_addr, TargetProtocol::All)
.await
.unwrap();
service.run().await
});
}
}
}
#[test]
fn test_kill_with_secio() {
test_kill(true)
}
#[test]
fn test_kill_with_no_secio() {
test_kill(false)
}