use heapless::spsc::Queue;
use libc::{c_char, c_int, c_uint, c_ulonglong, c_void};
use std::{
cell::RefCell,
collections::HashMap,
ffi::CStr,
ffi::CString,
ptr,
rc::Rc,
sync::{
self,
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
};
#[allow(dead_code)]
#[repr(C)]
#[derive(PartialEq)]
pub enum ChannelCbEvent {
ChannelCbEventConnect = 1, ChannelCbEventAccept = 2, ChannelCbEventRecv = 4, ChannelCbEventSend = 8, ChannelCbEventClose = 16, ChannelCbEventTimeout = 32, ChannelCbEventConnectTimeout = 64, ChannelCbEventAcceptFailed, ChannelCbEventAcceptClient, ChannelCbEventConnectFailed, }
extern "C" {
#[allow(dead_code)]
pub fn knet_loop_create() -> *mut c_void;
#[allow(dead_code)]
pub fn knet_loop_destroy(c_loop: *mut c_void);
#[allow(dead_code)]
pub fn knet_loop_create_channel(
c_loop: *mut c_void,
max_send_list_len: c_uint,
recv_ring_len: c_uint,
) -> *mut c_void;
#[allow(dead_code)]
pub fn knet_loop_run_once(c_loop: *mut c_void) -> c_int;
#[allow(dead_code)]
pub fn knet_channel_ref_accept(
c_channel: *mut c_void,
ip: *const c_char,
port: c_int,
backlog: c_int,
) -> c_int;
#[allow(dead_code)]
pub fn knet_channel_ref_connect(
c_channel: *mut c_void,
ip: *const c_char,
port: c_int,
timeout: c_int,
) -> c_int;
#[allow(dead_code)]
pub fn knet_channel_ref_close(c_channel: *mut c_void);
#[allow(dead_code)]
pub fn knet_channel_ref_get_stream(c_channel: *mut c_void) -> *mut c_void;
#[allow(dead_code)]
pub fn knet_channel_ref_set_cb(
c_channel: *mut c_void,
cb: extern "C" fn(*mut c_void, ChannelCbEvent),
);
#[allow(dead_code)]
pub fn knet_channel_ref_get_uuid(c_channel: *mut c_void) -> c_ulonglong;
#[allow(dead_code)]
pub fn knet_channel_ref_get_peer_address(c_channel: *mut c_void) -> *mut c_void;
#[allow(dead_code)]
pub fn knet_channel_ref_get_local_address(c_channel: *mut c_void) -> *mut c_void;
#[allow(dead_code)]
pub fn knet_stream_available(stream: *mut c_void) -> c_int;
#[allow(dead_code)]
pub fn knet_stream_eat(stream: *mut c_void, size: c_int) -> c_int;
#[allow(dead_code)]
pub fn knet_stream_pop(stream: *mut c_void, buffer: *mut c_void, size: c_int) -> c_int;
#[allow(dead_code)]
pub fn knet_stream_push(stream: *mut c_void, buffer: *const c_void, size: c_int) -> c_int;
#[allow(dead_code)]
pub fn knet_stream_copy(stream: *mut c_void, buffer: *mut c_void, size: c_int) -> c_int;
#[allow(dead_code)]
pub fn address_get_ip(address: *mut c_void) -> *const c_char;
#[allow(dead_code)]
pub fn address_get_port(address: *mut c_void) -> c_int;
#[allow(dead_code)]
pub fn knet_channel_ref_get_ptr(c_channel: *mut c_void) -> *mut c_void;
#[allow(dead_code)]
pub fn knet_channel_ref_set_ptr(c_channel: *mut c_void, data: *mut c_void);
}
type ChannelCallBack = Rc<RefCell<dyn FnMut(&mut Channel, ChannelCbEvent)>>;
pub struct Channel {
channel_ptr: *mut c_void, close_flag: bool, cb: ChannelCallBack, }
impl Channel {
#[allow(dead_code)]
pub fn close(self: &mut Channel) {
if self.close_flag {
return;
}
self.close_flag = true;
unsafe {
knet_channel_ref_close(self.channel_ptr);
}
}
#[allow(dead_code)]
pub fn is_close(self: &mut Channel) -> bool {
return self.close_flag;
}
#[allow(dead_code)]
#[doc(hidden)] fn set_dispose(self: &mut Channel) {
self.close_flag = true;
}
#[allow(dead_code)]
pub fn get_uuid(self: &mut Channel) -> u64 {
unsafe {
return knet_channel_ref_get_uuid(self.channel_ptr);
}
}
#[allow(dead_code)]
pub fn available(self: &mut Channel) -> i32 {
if self.close_flag {
return 0;
}
unsafe {
let stream = knet_channel_ref_get_stream(self.channel_ptr);
if stream.is_null() {
return 0;
} else {
return knet_stream_available(stream);
}
}
}
#[allow(dead_code)]
pub fn set_handler(self: &mut Channel, cb: impl FnMut(&mut Channel, ChannelCbEvent) + 'static) {
self.cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn read(self: &mut Channel, buffer: &mut Vec<u8>, size: i32) -> i32 {
if self.close_flag {
return 0;
}
unsafe {
let stream = knet_channel_ref_get_stream(self.channel_ptr);
if stream.is_null() {
return 0;
} else {
let bytes = knet_stream_available(stream);
if bytes > size {
return 0;
}
if 0 == knet_stream_pop(stream, buffer.as_mut_ptr() as *mut c_void, size) {
return size;
} else {
return 0;
}
}
}
}
#[allow(dead_code)]
pub fn copy(self: &mut Channel, buffer: &mut Vec<u8>, size: i32) -> i32 {
if self.close_flag {
return 0;
}
unsafe {
let stream = knet_channel_ref_get_stream(self.channel_ptr);
if stream.is_null() {
return 0;
} else {
let bytes = knet_stream_available(stream);
if bytes > size {
return 0;
}
if 0 == knet_stream_copy(stream, buffer.as_mut_ptr() as *mut c_void, size) {
return size;
} else {
return 0;
}
}
}
}
#[allow(dead_code)]
pub fn write(self: &mut Channel, buffer: &Vec<u8>) -> i32 {
if self.close_flag {
return 0;
}
let size = buffer.len() as i32;
unsafe {
let stream = knet_channel_ref_get_stream(self.channel_ptr);
if stream.is_null() {
return 0;
} else {
match knet_stream_push(stream, buffer.as_ptr() as *mut c_void, size) {
0 => size,
_ => 0,
}
}
}
}
#[allow(dead_code)]
pub fn get_local_address(self: &mut Channel) -> (&'static str, i32) {
if self.close_flag {
return ("", 0);
} else {
unsafe {
let addr = knet_channel_ref_get_local_address(self.channel_ptr);
let ip = address_get_ip(addr);
let port = address_get_port(addr);
(CStr::from_ptr(ip).to_str().unwrap(), port)
}
}
}
#[allow(dead_code)]
pub fn get_peer_address(self: &mut Channel) -> (&'static str, i32) {
if self.close_flag {
return ("", 0);
} else {
unsafe {
let addr = knet_channel_ref_get_peer_address(self.channel_ptr);
let ip = address_get_ip(addr);
let port = address_get_port(addr);
(CStr::from_ptr(ip).to_str().unwrap(), port)
}
}
}
}
#[derive(Debug, Clone)]
pub struct Loop {
loop_ptr: *mut c_void, close_flag: bool, }
impl Drop for Loop {
fn drop(&mut self) {
self.destroy();
}
}
pub struct CLoopWrapper {
pub loop_ptr: *mut c_void, pub count: i32, channel_map: HashMap<u64, Rc<RefCell<Channel>>>, }
thread_local! {
static LOOP_GLOBAL_PTR: RefCell<CLoopWrapper> =
RefCell::new(CLoopWrapper{
loop_ptr: ptr::null_mut(),
count: 0,
channel_map: HashMap::new(),
});
}
impl Loop {
#[allow(dead_code)]
pub fn new() -> Option<Loop> {
unsafe {
let loop_ptr = knet_loop_create();
if loop_ptr.is_null() {
return None;
}
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut().count += 1;
if !p.borrow_mut().loop_ptr.is_null() {
return Some(Loop {
loop_ptr,
close_flag: false,
});
} else {
p.borrow_mut().loop_ptr = loop_ptr;
}
let loop_inst = Loop {
loop_ptr,
close_flag: false,
};
Some(loop_inst)
})
}
}
#[allow(dead_code)]
pub fn destroy(self: &mut Loop) {
unsafe {
if self.close_flag {
return;
}
self.close_flag = true;
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut().channel_map.clear();
p.borrow_mut().count -= 1;
if p.borrow_mut().count == 0 {
knet_loop_destroy(self.loop_ptr);
p.borrow_mut().loop_ptr = ptr::null_mut();
}
});
}
}
#[allow(dead_code)]
pub fn tick(self: &Loop) {
if self.close_flag {
return;
}
unsafe {
knet_loop_run_once(self.loop_ptr);
}
}
#[allow(dead_code)]
pub fn close(self: &Loop, chan_id: u64) {
LOOP_GLOBAL_PTR.with(|p| {
let channel_mut = match p.borrow_mut().channel_map.get(&chan_id) {
Some(c) => c.clone(),
_ => return,
};
channel_mut.as_ref().borrow_mut().close();
});
}
#[allow(dead_code)]
pub fn send(self: &Loop, chan_id: u64, data: &Vec<u8>) {
LOOP_GLOBAL_PTR.with(|p| {
let channel_mut = match p.borrow_mut().channel_map.get(&chan_id) {
Some(c) => c.clone(),
_ => return,
};
channel_mut.as_ref().borrow_mut().write(&data);
});
}
#[allow(dead_code)]
pub fn accept(
self: &Loop,
ip: &str,
port: i32,
backlog: i32,
cb: impl FnMut(&mut Channel, ChannelCbEvent) + 'static,
) -> Option<Rc<RefCell<Channel>>> {
if self.close_flag {
return None;
}
unsafe {
let channel_ptr = knet_loop_create_channel(self.loop_ptr, 1024, 1024);
if channel_ptr.is_null() {
return None;
}
knet_channel_ref_set_cb(channel_ptr, Loop::_acceptor_cb);
let accept_chan_id = knet_channel_ref_get_uuid(channel_ptr);
knet_channel_ref_set_ptr(channel_ptr, accept_chan_id as *mut c_void);
let ip_ptr = CString::new(ip).unwrap();
let e = knet_channel_ref_accept(channel_ptr, ip_ptr.as_ptr(), port, backlog);
if e == 0 {
let channel_sptr = Rc::new(RefCell::new(Channel {
channel_ptr,
close_flag: false,
cb: Rc::new(RefCell::new(cb)),
}));
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut()
.channel_map
.insert(accept_chan_id, channel_sptr.clone());
let mut _cb = (*channel_sptr.clone().as_ref().borrow_mut()).cb.clone();
(_cb.as_ref().borrow_mut())(
&mut *channel_sptr.clone().as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventAccept,
);
Some(channel_sptr.clone())
})
} else {
let channel_sptr = Rc::new(RefCell::new(Channel {
channel_ptr,
close_flag: true,
cb: Rc::new(RefCell::new(cb)),
}));
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut()
.channel_map
.insert(accept_chan_id, channel_sptr.clone());
let mut _cb = (*channel_sptr.clone().as_ref().borrow_mut()).cb.clone();
(_cb.as_ref().borrow_mut())(
&mut *channel_sptr.clone().as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventAcceptFailed,
);
knet_channel_ref_close(channel_ptr);
None
})
}
}
}
#[allow(dead_code)]
pub fn connect(
self: &Loop,
ip: &str,
port: i32,
timeout: i32,
cb: impl FnMut(&mut Channel, ChannelCbEvent) + 'static,
) -> Option<Rc<RefCell<Channel>>> {
unsafe {
let channel_ptr = knet_loop_create_channel(self.loop_ptr, 1024, 1024);
if channel_ptr.is_null() {
return None;
}
knet_channel_ref_set_cb(channel_ptr, Loop::_client_cb);
let ip_ptr = CString::new(ip).unwrap();
let e = knet_channel_ref_connect(channel_ptr, ip_ptr.as_ptr(), port, timeout);
if e == 0 {
let channel_sptr = Rc::new(RefCell::new(Channel {
channel_ptr,
close_flag: false,
cb: Rc::new(RefCell::new(cb)),
}));
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut()
.channel_map
.insert(knet_channel_ref_get_uuid(channel_ptr), channel_sptr.clone());
Some(channel_sptr)
})
} else {
let channel_sptr = Rc::new(RefCell::new(Channel {
channel_ptr,
close_flag: true,
cb: Rc::new(RefCell::new(cb)),
}));
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut()
.channel_map
.insert(knet_channel_ref_get_uuid(channel_ptr), channel_sptr.clone());
let mut _cb = (*channel_sptr.clone().as_ref().borrow_mut()).cb.clone();
(_cb.as_ref().borrow_mut())(
&mut *channel_sptr.clone().as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventConnectFailed,
);
knet_channel_ref_close(channel_ptr);
None
})
}
}
}
extern "C" fn _client_cb(_channel_ptr: *mut c_void, _e: ChannelCbEvent) {
unsafe {
let chan_id = knet_channel_ref_get_uuid(_channel_ptr);
let channel_mut =
LOOP_GLOBAL_PTR.with(|p| match p.borrow_mut().channel_map.get(&chan_id) {
Some(c) => c.clone(),
_ => panic!("Client not found, channel UUID {}", chan_id),
});
let cb_ref = &mut channel_mut.as_ref().borrow_mut().cb.clone();
let mut cb = cb_ref.as_ref().borrow_mut();
match _e {
ChannelCbEvent::ChannelCbEventClose => {
channel_mut.as_ref().borrow_mut().set_dispose();
(cb)(
&mut channel_mut.as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventClose,
);
LOOP_GLOBAL_PTR.with(|p| {
p.borrow_mut().channel_map.remove(&chan_id);
});
}
ChannelCbEvent::ChannelCbEventConnect => {
(cb)(
&mut channel_mut.as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventConnect,
);
}
ChannelCbEvent::ChannelCbEventRecv => {
(cb)(
&mut channel_mut.as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventRecv,
);
}
ChannelCbEvent::ChannelCbEventConnectTimeout => {
(cb)(
&mut channel_mut.as_ref().borrow_mut(),
ChannelCbEvent::ChannelCbEventConnectTimeout,
);
}
ChannelCbEvent::ChannelCbEventAccept => todo!(),
ChannelCbEvent::ChannelCbEventSend => todo!(),
ChannelCbEvent::ChannelCbEventTimeout => todo!(),
ChannelCbEvent::ChannelCbEventAcceptFailed => todo!(),
ChannelCbEvent::ChannelCbEventAcceptClient => todo!(),
ChannelCbEvent::ChannelCbEventConnectFailed => todo!(),
}
}
}
extern "C" fn _acceptor_cb(_channel_ptr: *mut c_void, _e: ChannelCbEvent) {
let mut _channel_map = LOOP_GLOBAL_PTR.with(|p| p.borrow_mut().channel_map.clone());
let cb;
unsafe {
let accept_chan_id = knet_channel_ref_get_ptr(_channel_ptr) as u64;
let chan_id = knet_channel_ref_get_uuid(_channel_ptr);
match _e {
ChannelCbEvent::ChannelCbEventAccept => {
let _ = match _channel_map.get(&accept_chan_id) {
Some(c) => c.as_ref().borrow().channel_ptr,
_ => panic!("Acceptor not found, channle UUID {}", accept_chan_id),
};
knet_channel_ref_set_cb(_channel_ptr, Loop::_client_cb);
cb = _channel_map
.get(&accept_chan_id)
.unwrap()
.as_ref()
.borrow()
.cb
.clone();
let _ = _channel_map.insert(
chan_id,
Rc::new(RefCell::new(Channel {
channel_ptr: _channel_ptr,
close_flag: false,
cb: cb.clone(), })),
);
let accept_channel =
_channel_map.get(&accept_chan_id).unwrap().as_ref().borrow();
(accept_channel.cb.as_ref().borrow_mut())(
&mut Channel {
channel_ptr: _channel_ptr,
close_flag: false,
cb: accept_channel.cb.clone(), },
ChannelCbEvent::ChannelCbEventAcceptClient,
);
}
ChannelCbEvent::ChannelCbEventConnect => todo!(),
ChannelCbEvent::ChannelCbEventRecv => todo!(),
ChannelCbEvent::ChannelCbEventSend => todo!(),
ChannelCbEvent::ChannelCbEventClose => todo!(),
ChannelCbEvent::ChannelCbEventTimeout => todo!(),
ChannelCbEvent::ChannelCbEventConnectTimeout => todo!(),
ChannelCbEvent::ChannelCbEventAcceptFailed => todo!(),
ChannelCbEvent::ChannelCbEventAcceptClient => todo!(),
ChannelCbEvent::ChannelCbEventConnectFailed => todo!(),
};
}
}
}
#[derive(Debug, Clone)]
struct NetEventListenReq {
ip: String,
port: i32,
}
#[derive(Debug, Clone)]
struct NetEventListenAck {
ip: String,
port: i32,
success: bool,
}
#[derive(Debug, Clone)]
struct NetEventAccept {
chan_id: u64,
}
#[derive(Debug, Clone)]
struct NetEventConnectReq {
ip: String,
port: i32,
}
#[derive(Debug, Clone)]
struct NetEventConnectAck {
ip: String,
port: i32,
success: bool,
chan_id: u64,
}
#[derive(Debug, Clone)]
struct NetEventSendNtf {
_chan_id: u64,
_data: Vec<u8>,
}
#[derive(Debug, Clone)]
struct NetEventCloseReq {
chan_id: u64,
}
#[derive(Debug, Clone)]
struct NetEventCloseNtf {
chan_id: u64,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
struct NetEventDataNtf {
chan_id: u64,
data: Vec<u8>,
}
#[derive(Debug, Clone)]
enum NetEvent {
EventListenReq(NetEventListenReq),
EventListenAck(NetEventListenAck),
EventConnectReq(NetEventConnectReq),
EventConnectAck(NetEventConnectAck),
EventSendNtf(NetEventSendNtf),
EventCloseReq(NetEventCloseReq),
EventCloseNtf(NetEventCloseNtf),
EventAccept(NetEventAccept),
EventDataNtf(NetEventDataNtf),
}
#[allow(dead_code)]
pub struct KnetNetwork {
running: sync::Arc<AtomicBool>,
worker: Option<thread::JoinHandle<()>>,
net_to_worker_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
worker_to_net_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
on_accept_cb: Rc<RefCell<dyn FnMut(&KnetNetwork, &str, i32, bool)>>,
on_connect_cb: Rc<RefCell<dyn FnMut(&KnetNetwork, u64, &str, i32, bool)>>,
on_read_cb: Rc<RefCell<dyn FnMut(&KnetNetwork, u64, &Vec<u8>)>>,
on_close_cb: Rc<RefCell<dyn FnMut(&KnetNetwork, u64)>>,
on_client_cb: Rc<RefCell<dyn FnMut(&KnetNetwork, u64)>>,
}
impl Drop for KnetNetwork {
fn drop(&mut self) {
self.stop();
}
}
impl KnetNetwork {
#[allow(dead_code)]
pub fn new() -> Arc<Mutex<KnetNetwork>> {
Arc::new(Mutex::new(KnetNetwork {
running: sync::Arc::new(AtomicBool::new(false)),
net_to_worker_queue: Arc::new(Mutex::new(Queue::new())),
worker_to_net_queue: Arc::new(Mutex::new(Queue::new())),
worker: None,
on_accept_cb: Rc::new(RefCell::new(KnetNetwork::_default_on_accept_cb)),
on_connect_cb: Rc::new(RefCell::new(KnetNetwork::_default_on_connect_cb)),
on_read_cb: Rc::new(RefCell::new(KnetNetwork::_default_on_read_cb)),
on_close_cb: Rc::new(RefCell::new(KnetNetwork::_default_on_close_cb)),
on_client_cb: Rc::new(RefCell::new(KnetNetwork::_default_on_client_cb)),
}))
}
#[allow(dead_code)]
pub fn on_accept(&mut self, cb: impl FnMut(&KnetNetwork, &str, i32, bool) + 'static) {
self.on_accept_cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn on_connect(&mut self, cb: impl FnMut(&KnetNetwork, u64, &str, i32, bool) + 'static) {
self.on_connect_cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn on_data(&mut self, cb: impl FnMut(&KnetNetwork, u64, &Vec<u8>) + 'static) {
self.on_read_cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn on_close(&mut self, cb: impl FnMut(&KnetNetwork, u64) + 'static) {
self.on_close_cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn on_client(&mut self, cb: impl FnMut(&KnetNetwork, u64) + 'static) {
self.on_client_cb = Rc::new(RefCell::new(cb));
}
#[allow(dead_code)]
pub fn start(&mut self) {
if self.running.load(Ordering::SeqCst) {
return;
}
self.running.store(true, Ordering::SeqCst);
let running = self.running.clone();
let net_to_worker_queue = self.net_to_worker_queue.clone();
let worker_to_net_queue = self.worker_to_net_queue.clone();
self.worker = Some(thread::spawn(move || {
let net_loop = Loop::new().unwrap();
while running.load(Ordering::SeqCst) {
net_loop.tick();
KnetNetwork::_process_worker_event(
net_to_worker_queue.clone(),
worker_to_net_queue.clone(),
&net_loop,
);
}
running.store(false, Ordering::SeqCst);
}));
}
#[allow(dead_code)]
pub fn stop(&mut self) {
if !self.running.load(Ordering::SeqCst) {
return;
}
self.running.store(false, Ordering::SeqCst);
self.worker
.take()
.expect("Non-running worker thread")
.join()
.expect("Join worker thread failed");
}
#[allow(dead_code)]
pub fn listen(self: &mut KnetNetwork, ip: String, port: i32) {
KnetNetwork::_notify_worker(
self.net_to_worker_queue.clone(),
NetEvent::EventListenReq(NetEventListenReq {
ip: ip.to_string(),
port,
}),
);
}
#[allow(dead_code)]
pub fn connect(self: &KnetNetwork, ip: String, port: i32) {
KnetNetwork::_notify_worker(
self.net_to_worker_queue.clone(),
NetEvent::EventConnectReq(NetEventConnectReq {
ip: ip.to_string(),
port: port,
}),
);
}
#[allow(dead_code)]
pub fn tick(self: &KnetNetwork) {
self._process_net_event();
}
#[allow(dead_code)]
pub fn send(self: &KnetNetwork, chan_id: u64, data: &Vec<u8>) {
KnetNetwork::_notify_worker(
self.net_to_worker_queue.clone(),
NetEvent::EventSendNtf(NetEventSendNtf {
_chan_id: chan_id,
_data: data.to_vec(),
}),
);
}
#[allow(dead_code)]
pub fn close(self: &KnetNetwork, chan_id: u64) {
KnetNetwork::_notify_worker(
self.net_to_worker_queue.clone(),
NetEvent::EventCloseReq(NetEventCloseReq { chan_id: chan_id }),
);
}
fn _default_on_accept_cb(_self: &KnetNetwork, _1: &str, _2: i32, _3: bool) {}
fn _default_on_connect_cb(_self: &KnetNetwork, _0: u64, _1: &str, _2: i32, _3: bool) {}
fn _default_on_read_cb(_self: &KnetNetwork, _1: u64, _2: &Vec<u8>) {}
fn _default_on_close_cb(_self: &KnetNetwork, _1: u64) {}
fn _default_on_client_cb(_self: &KnetNetwork, _1: u64) {}
fn _notify_main(queue: Arc<Mutex<Queue<NetEvent, 1024>>>, e: NetEvent) {
queue.lock().unwrap().enqueue(e).unwrap();
}
fn _notify_worker(queue: Arc<Mutex<Queue<NetEvent, 1024>>>, e: NetEvent) {
queue.lock().unwrap().enqueue(e).unwrap();
}
fn _process_listen_in_worker(
net_loop: &Loop,
req: &NetEventListenReq,
net_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
) {
net_loop.accept(
&req.ip,
req.port,
512,
move |_chan: &mut Channel, _e: ChannelCbEvent| {
match _e {
ChannelCbEvent::ChannelCbEventRecv => {
let size = _chan.available();
let mut data = Vec::with_capacity(size.try_into().unwrap());
let uuid = _chan.get_uuid();
match _chan.read(&mut data, size) {
0 => return,
_ => unsafe { data.set_len(size.try_into().unwrap()) },
}
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventDataNtf(NetEventDataNtf {
chan_id: uuid,
data,
}),
);
}
ChannelCbEvent::ChannelCbEventAccept => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventListenAck(NetEventListenAck {
ip: _chan.get_local_address().0.to_owned(),
port: _chan.get_local_address().1,
success: true,
}),
);
}
ChannelCbEvent::ChannelCbEventAcceptFailed => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventListenAck(NetEventListenAck {
ip: _chan.get_local_address().0.to_owned(),
port: _chan.get_local_address().1,
success: false,
}),
);
}
ChannelCbEvent::ChannelCbEventAcceptClient => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventAccept(NetEventAccept {
chan_id: _chan.get_uuid(),
}),
);
}
ChannelCbEvent::ChannelCbEventClose => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventCloseNtf(NetEventCloseNtf {
chan_id: _chan.get_uuid(),
}),
);
}
_ => (),
};
},
);
}
fn _process_connect_in_worker(
net_loop: &Loop,
req: &NetEventConnectReq,
net_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
) {
net_loop.connect(
&req.ip,
req.port,
512,
move |_chan: &mut Channel, _e: ChannelCbEvent| {
match _e {
ChannelCbEvent::ChannelCbEventConnect => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventConnectAck(NetEventConnectAck {
chan_id: _chan.get_uuid(),
ip: _chan.get_local_address().0.to_owned(),
port: _chan.get_local_address().1,
success: true,
}),
);
}
ChannelCbEvent::ChannelCbEventConnectFailed => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventConnectAck(NetEventConnectAck {
chan_id: _chan.get_uuid(),
ip: _chan.get_local_address().0.to_owned(),
port: _chan.get_local_address().1,
success: false,
}),
);
}
ChannelCbEvent::ChannelCbEventRecv => {
let size = _chan.available();
let mut data = Vec::with_capacity(size.try_into().unwrap());
let uuid = _chan.get_uuid();
match _chan.read(&mut data, size) {
0 => return,
_ => unsafe { data.set_len(size.try_into().unwrap()) },
}
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventDataNtf(NetEventDataNtf {
chan_id: uuid,
data,
}),
);
}
ChannelCbEvent::ChannelCbEventClose => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventCloseNtf(NetEventCloseNtf {
chan_id: _chan.get_uuid(),
}),
);
}
ChannelCbEvent::ChannelCbEventConnectTimeout => {
KnetNetwork::_notify_main(
net_queue.clone(),
NetEvent::EventConnectAck(NetEventConnectAck {
chan_id: _chan.get_uuid(),
ip: _chan.get_local_address().0.to_owned(),
port: _chan.get_local_address().1,
success: false,
}),
);
}
_ => (),
};
},
);
}
fn _process_worker_event(
net_to_worker_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
worker_to_net_queue: Arc<Mutex<Queue<NetEvent, 1024>>>,
net_loop: &Loop,
) {
loop {
match net_to_worker_queue.lock().unwrap().dequeue() {
Some(c) => {
match c {
NetEvent::EventListenReq(req) => {
KnetNetwork::_process_listen_in_worker(
net_loop,
&req,
worker_to_net_queue.clone(),
);
}
NetEvent::EventConnectReq(req) => {
KnetNetwork::_process_connect_in_worker(
net_loop,
&req,
worker_to_net_queue.clone(),
);
}
NetEvent::EventCloseReq(req) => {
net_loop.close(req.chan_id);
KnetNetwork::_notify_main(
worker_to_net_queue.clone(),
NetEvent::EventCloseNtf(NetEventCloseNtf {
chan_id: req.chan_id,
}),
);
}
NetEvent::EventSendNtf(ntf) => {
net_loop.send(ntf._chan_id, &ntf._data);
}
NetEvent::EventListenAck(_) => todo!(),
NetEvent::EventConnectAck(_) => todo!(),
NetEvent::EventCloseNtf(_) => todo!(),
NetEvent::EventAccept(_) => todo!(),
NetEvent::EventDataNtf(_) => todo!(),
}
}
None => return, };
}
}
fn _process_net_event(&self) {
loop {
match self.worker_to_net_queue.lock().unwrap().dequeue() {
Some(c) => match c {
NetEvent::EventListenAck(ack) => {
(*self.on_accept_cb).borrow_mut()(&self, &ack.ip, ack.port, ack.success);
}
NetEvent::EventConnectAck(ack) => {
(*self.on_connect_cb).borrow_mut()(
&self,
ack.chan_id,
&ack.ip,
ack.port,
ack.success,
);
}
NetEvent::EventCloseNtf(ntf) => {
(*self.on_close_cb).borrow_mut()(&self, ntf.chan_id);
}
NetEvent::EventAccept(ntf) => {
(*self.on_client_cb).borrow_mut()(&self, ntf.chan_id);
}
NetEvent::EventDataNtf(ntf) => {
(*self.on_read_cb).borrow_mut()(&self, ntf.chan_id, &ntf.data);
}
NetEvent::EventListenReq(_) => todo!(),
NetEvent::EventConnectReq(_) => todo!(),
NetEvent::EventSendNtf(_) => todo!(),
NetEvent::EventCloseReq(_) => todo!(),
},
None => return, }
}
}
}
#[cfg(test)]
mod tests {
use core::time;
use std::{cell::RefCell, ffi::CString, rc::Rc, thread};
use crate::knet;
#[test]
fn test_loop_lifecycle() {
unsafe {
let loop_ptr = knet::knet_loop_create();
assert!(!loop_ptr.is_null(), "loop is null");
knet::knet_loop_destroy(loop_ptr);
}
}
#[test]
fn test_loop_create_channel() {
unsafe {
let loop_ptr = knet::knet_loop_create();
assert!(!loop_ptr.is_null(), "loop is null");
let channel_ptr = knet::knet_loop_create_channel(loop_ptr, 1024, 1024);
assert!(!channel_ptr.is_null(), "channel is null");
knet::knet_channel_ref_close(channel_ptr);
knet::knet_loop_destroy(loop_ptr);
}
}
#[test]
fn test_channel_accept() {
unsafe {
let loop_ptr = knet::knet_loop_create();
assert!(!loop_ptr.is_null(), "loop is null");
let channel_ptr = knet::knet_loop_create_channel(loop_ptr, 1024, 1024);
assert!(!channel_ptr.is_null(), "channel is null");
let ip = CString::new("127.0.0.1").unwrap();
let e = knet::knet_channel_ref_accept(channel_ptr, ip.as_ptr(), 12345, 512);
assert_eq!(e, 0, "channel accept failed");
knet::knet_channel_ref_close(channel_ptr);
knet::knet_loop_destroy(loop_ptr);
}
}
#[test]
fn test_loop_create_rust() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
}
#[test]
fn test_loop_destroy_rust() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
ret.unwrap().destroy();
}
#[test]
fn test_accept() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let c_ret = ret.unwrap().accept(
"127.0.0.1",
58888,
512,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match c_ret {
Some(_) => assert!(true),
None => assert!(false),
}
}
#[test]
fn test_connect() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58888,
1,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
l.tick();
}
#[test]
fn test_get_uuid() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
assert!(acceptor.unwrap().as_ref().borrow_mut().get_uuid() != 0);
}
#[test]
fn test_available() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58888,
1,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
l.tick();
assert!(connector.unwrap().as_ref().borrow_mut().available() == 0);
}
#[test]
fn test_address() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58888,
1,
|_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
l.tick();
let mut_conn = connector.unwrap();
let peer_addr = mut_conn.as_ref().borrow_mut().get_peer_address();
assert!(peer_addr.0 == "127.0.0.1");
assert!(peer_addr.1 == 58888);
let local_addr = mut_conn.as_ref().borrow_mut().get_local_address();
assert!(local_addr.0 == "127.0.0.1");
assert!(local_addr.1 != 58888);
}
#[test]
fn test_cb() {
let ret = knet::Loop::new();
let accepted = Rc::new(RefCell::new(false));
let connected = Rc::new(RefCell::new(false));
let clone_accepted = accepted.clone();
let clone_connected = connected.clone();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {
(*accepted.as_ref().borrow_mut()) = true;
},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58888,
1,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {
(*connected.as_ref().borrow_mut()) = true;
},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
l.tick();
l.tick();
assert!(*clone_connected.borrow());
assert!(*clone_accepted.borrow());
}
#[test]
fn test_close() {
let ret = knet::Loop::new();
let accepted = Rc::new(RefCell::new(false));
let connected = Rc::new(RefCell::new(false));
let clone_accepted = accepted.clone();
let clone_connected = connected.clone();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58888,
512,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {
(*accepted.as_ref().borrow_mut()) = true;
},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58888,
1,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {
(*connected.as_ref().borrow_mut()) = true;
},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
l.tick();
l.tick();
assert!(*clone_connected.borrow());
assert!(*clone_accepted.borrow());
acceptor.unwrap().as_ref().borrow_mut().close();
connector.unwrap().as_ref().borrow_mut().close();
}
#[test]
fn test_read_write() {
let ret = knet::Loop::new();
let read = Rc::new(RefCell::new(false));
let wrote = Rc::new(RefCell::new(false));
let clone_read = read.clone();
let clone_wrote = wrote.clone();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"127.0.0.1",
58880,
512,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| match _e {
knet::ChannelCbEvent::ChannelCbEventRecv => (),
knet::ChannelCbEvent::ChannelCbEventConnect => println!("{}", _e as i32),
knet::ChannelCbEvent::ChannelCbEventAccept => (),
knet::ChannelCbEvent::ChannelCbEventSend => println!("{}", _e as i32),
knet::ChannelCbEvent::ChannelCbEventClose => println!("{}", _e as i32),
knet::ChannelCbEvent::ChannelCbEventTimeout => println!("{}", _e as i32),
knet::ChannelCbEvent::ChannelCbEventConnectTimeout => println!("{}", _e as i32),
knet::ChannelCbEvent::ChannelCbEventConnectFailed => (),
knet::ChannelCbEvent::ChannelCbEventAcceptFailed => (),
knet::ChannelCbEvent::ChannelCbEventAcceptClient => {
let mut vec: Vec<u8> = Vec::new();
vec.push(1);
vec.push(2);
vec.push(3);
for _ in 1..10 {
let count = _chan.write(&vec);
assert!(count == 3);
}
(*wrote.as_ref().borrow_mut()) = true;
}
},
);
match acceptor {
Some(_) => assert!(true),
None => assert!(false),
}
let connector = l.connect(
"127.0.0.1",
58880,
1,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| {
if _e == knet::ChannelCbEvent::ChannelCbEventRecv {
(*read.as_ref().borrow_mut()) = true;
}
},
);
match connector {
Some(_) => assert!(true),
None => assert!(false),
}
for _ in 1..10 {
l.tick();
}
assert!(*clone_wrote.borrow());
assert!(*clone_read.borrow());
}
#[test]
fn test_accept_failed() {
let ret = knet::Loop::new();
assert!(!ret.is_none(), "loop is null");
let l: &mut knet::Loop = &mut ret.unwrap();
let acceptor = l.accept(
"1.2.3.4",
52121,
1,
move |_chan: &mut knet::Channel, _e: knet::ChannelCbEvent| match _e {
_ => assert!(_e == knet::ChannelCbEvent::ChannelCbEventAcceptFailed),
},
);
match acceptor {
Some(_) => assert!(false),
None => assert!(true),
}
l.tick();
}
#[test]
fn test_network_start_stop() {
let net = knet::KnetNetwork::new();
net.lock().unwrap().start();
net.lock().unwrap().stop();
}
#[test]
fn test_network_listen() {
let net = knet::KnetNetwork::new();
net.lock().unwrap().start();
net.lock().unwrap().on_accept(
|_network: &knet::KnetNetwork, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port == 12345);
},
);
net.lock().unwrap().listen(String::from("127.0.0.1"), 12345);
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
}
#[test]
fn test_network_listen_connect() {
let net = knet::KnetNetwork::new();
net.lock().unwrap().start();
net.lock().unwrap().on_accept(
|_network: &knet::KnetNetwork, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port == 12345);
},
);
net.lock().unwrap().on_connect(
|_network: &knet::KnetNetwork, _chan_id: u64, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port != 0);
assert!(_chan_id != 0);
},
);
net.lock().unwrap().listen(String::from("127.0.0.1"), 12345);
net.lock()
.unwrap()
.connect(String::from("127.0.0.1"), 12345);
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
}
#[test]
fn test_network_read_write() {
let net = knet::KnetNetwork::new();
net.lock().unwrap().start();
net.lock().unwrap().on_accept(
|_network: &knet::KnetNetwork, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port == 12345);
},
);
net.lock()
.unwrap()
.on_client(|_net: &knet::KnetNetwork, _chan_id: u64| {
let mut vec: Vec<u8> = Vec::new();
vec.push(1);
vec.push(2);
vec.push(3);
_net.send(_chan_id, &vec);
});
net.lock().unwrap().on_connect(
|_network: &knet::KnetNetwork, _chan_id: u64, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port != 0);
assert!(_chan_id != 0);
},
);
net.lock()
.unwrap()
.on_data(|_net: &knet::KnetNetwork, _chan_id: u64, _data: &Vec<u8>| {
assert!(_data.len() == 3);
});
net.lock().unwrap().listen(String::from("127.0.0.1"), 12345);
net.lock()
.unwrap()
.connect(String::from("127.0.0.1"), 12345);
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
}
#[test]
fn test_network_close() {
let net = knet::KnetNetwork::new();
net.lock().unwrap().start();
net.lock().unwrap().on_accept(
|_network: &knet::KnetNetwork, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port == 12345);
},
);
net.lock()
.unwrap()
.on_client(|_net: &knet::KnetNetwork, _chan_id: u64| {
let mut vec: Vec<u8> = Vec::new();
vec.push(1);
vec.push(2);
vec.push(3);
_net.close(_chan_id);
});
net.lock().unwrap().on_connect(
|_network: &knet::KnetNetwork, _chan_id: u64, _ip: &str, _port: i32, success: bool| {
assert!(success);
assert!(_ip == "127.0.0.1");
assert!(_port != 0);
assert!(_chan_id != 0);
},
);
net.lock()
.unwrap()
.on_close(|_net: &knet::KnetNetwork, _chan_id: u64| {});
net.lock().unwrap().listen(String::from("127.0.0.1"), 12345);
net.lock()
.unwrap()
.connect(String::from("127.0.0.1"), 12345);
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
thread::sleep(time::Duration::from_millis(100));
for _ in 1..10 {
net.lock().unwrap().tick();
}
}
}