rpc_core_net/
tcp_server.rs1use std::cell::RefCell;
2use std::net::SocketAddr;
3use std::rc::{Rc, Weak};
4
5use log::{debug, trace};
6use tokio::net::TcpListener;
7use tokio::select;
8use tokio::sync::Notify;
9
10use crate::config::TcpConfig;
11use crate::detail::tcp_channel::TcpChannel;
12
13pub struct TcpServer {
14 port: RefCell<u16>,
15 config: Rc<RefCell<TcpConfig>>,
16 on_session: RefCell<Option<Box<dyn Fn(Weak<TcpChannel>)>>>,
17 quit_notify: Notify,
18 this: RefCell<Weak<Self>>,
19}
20
21impl TcpServer {
23 pub fn new(port: u16, config: TcpConfig) -> Rc<Self> {
24 let config = Rc::new(RefCell::new(config));
25 let r = Rc::new(Self {
26 port: port.into(),
27 config: config.clone(),
28 on_session: None.into(),
29 quit_notify: Notify::new(),
30 this: Weak::new().into(),
31 });
32 let this_weak = Rc::downgrade(&r);
33 *r.this.borrow_mut() = this_weak.clone().into();
34 r
35 }
36
37 pub fn downgrade(&self) -> Weak<Self> {
38 self.this.borrow().clone()
39 }
40
41 pub fn start(&self) {
42 self.config.borrow_mut().init();
43 let port = *self.port.borrow();
44
45 let this_weak = self.this.borrow().clone();
46
47 tokio::task::spawn_local(async move {
48 debug!("listen: {port}");
49 let listener = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), port)).await.unwrap();
50 loop {
51 let this = this_weak.upgrade().unwrap();
52 select! {
53 res = listener.accept() => {
54 match res {
55 Ok((stream, addr))=> {
56 debug!("accept addr: {addr}");
57 tokio::task::spawn_local(async move {
58 let session = TcpChannel::new(this.config.clone());
59 if let Some(on_session) = this.on_session.borrow_mut().as_ref() {
60 session.do_open(stream);
61 on_session(Rc::downgrade(&session));
62 }
63 });
64 },
65 Err(e) => {
66 println!("Error accepting connection: {}", e);
67 }
68 }
69 }
70 _ = this.quit_notify.notified() => {
71 trace!("server: stop");
72 break;
73 }
74 }
75 }
76 });
77 }
78
79 pub fn stop(&self) {
80 self.quit_notify.notify_one();
81 }
82
83 pub fn on_session<F>(&self, callback: F)
84 where F: Fn(Weak<TcpChannel>) + 'static,
85 {
86 *self.on_session.borrow_mut() = Some(Box::new(callback));
87 }
88}
89