rpc_core/net/
tcp_server.rs1use std::cell::RefCell;
2use std::net::SocketAddr;
3use std::rc::{Rc, Weak};
4
5use log::{debug, trace};
6use tokio::net::TcpListener;
7use tokio::select;
8use tokio::sync::Notify;
9
10use crate::net::config::TcpConfig;
11use crate::net::detail::tcp_channel::TcpChannel;
12
13pub struct TcpServer {
14 port: RefCell<u16>,
15 config: Rc<RefCell<TcpConfig>>,
16 on_session: RefCell<Option<Box<dyn Fn(Weak<TcpChannel>)>>>,
17 quit_notify: Notify,
18 this: RefCell<Weak<Self>>,
19}
20
21impl TcpServer {
23 pub fn new(port: u16, config: TcpConfig) -> Rc<Self> {
24 let config = Rc::new(RefCell::new(config));
25 let r = Rc::new(Self {
26 port: port.into(),
27 config: config.clone(),
28 on_session: None.into(),
29 quit_notify: Notify::new(),
30 this: Weak::new().into(),
31 });
32 let this_weak = Rc::downgrade(&r);
33 *r.this.borrow_mut() = this_weak.clone().into();
34 r
35 }
36
37 pub fn downgrade(&self) -> Weak<Self> {
38 self.this.borrow().clone()
39 }
40
41 pub fn start(&self) {
42 self.config.borrow_mut().init();
43 let port = *self.port.borrow();
44
45 let this_weak = self.this.borrow().clone();
46
47 tokio::task::spawn_local(async move {
48 debug!("listen: {port}");
49 let listener = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), port))
50 .await
51 .unwrap();
52 loop {
53 let this = this_weak.upgrade().unwrap();
54 select! {
55 res = listener.accept() => {
56 match res {
57 Ok((stream, addr))=> {
58 debug!("accept addr: {addr}");
59 tokio::task::spawn_local(async move {
60 let session = TcpChannel::new(this.config.clone());
61 if let Some(on_session) = this.on_session.borrow_mut().as_ref() {
62 session.do_open(stream);
63 on_session(Rc::downgrade(&session));
64 }
65 });
66 },
67 Err(e) => {
68 println!("Error accepting connection: {}", e);
69 }
70 }
71 }
72 _ = this.quit_notify.notified() => {
73 trace!("server: stop");
74 break;
75 }
76 }
77 }
78 });
79 }
80
81 pub fn stop(&self) {
82 self.quit_notify.notify_one();
83 }
84
85 pub fn on_session<F>(&self, callback: F)
86 where
87 F: Fn(Weak<TcpChannel>) + 'static,
88 {
89 *self.on_session.borrow_mut() = Some(Box::new(callback));
90 }
91}