rpc_core/net/
tcp_server.rs

1use std::cell::RefCell;
2use std::net::SocketAddr;
3use std::rc::{Rc, Weak};
4
5use log::{debug, trace};
6use tokio::net::TcpListener;
7use tokio::select;
8use tokio::sync::Notify;
9
10use crate::net::config::TcpConfig;
11use crate::net::detail::tcp_channel::TcpChannel;
12
13pub struct TcpServer {
14    port: RefCell<u16>,
15    config: Rc<RefCell<TcpConfig>>,
16    on_session: RefCell<Option<Box<dyn Fn(Weak<TcpChannel>)>>>,
17    quit_notify: Notify,
18    this: RefCell<Weak<Self>>,
19}
20
21// public
22impl TcpServer {
23    pub fn new(port: u16, config: TcpConfig) -> Rc<Self> {
24        let config = Rc::new(RefCell::new(config));
25        let r = Rc::new(Self {
26            port: port.into(),
27            config: config.clone(),
28            on_session: None.into(),
29            quit_notify: Notify::new(),
30            this: Weak::new().into(),
31        });
32        let this_weak = Rc::downgrade(&r);
33        *r.this.borrow_mut() = this_weak.clone().into();
34        r
35    }
36
37    pub fn downgrade(&self) -> Weak<Self> {
38        self.this.borrow().clone()
39    }
40
41    pub fn start(&self) {
42        self.config.borrow_mut().init();
43        let port = *self.port.borrow();
44
45        let this_weak = self.this.borrow().clone();
46
47        tokio::task::spawn_local(async move {
48            debug!("listen: {port}");
49            let listener = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), port))
50                .await
51                .unwrap();
52            loop {
53                let this = this_weak.upgrade().unwrap();
54                select! {
55                    res = listener.accept() => {
56                        match res {
57                            Ok((stream, addr))=> {
58                                debug!("accept addr: {addr}");
59                                tokio::task::spawn_local(async move {
60                                    let session = TcpChannel::new(this.config.clone());
61                                    if let Some(on_session) = this.on_session.borrow_mut().as_ref() {
62                                        session.do_open(stream);
63                                        on_session(Rc::downgrade(&session));
64                                    }
65                                });
66                            },
67                            Err(e) => {
68                                println!("Error accepting connection: {}", e);
69                            }
70                        }
71                    }
72                    _ = this.quit_notify.notified() => {
73                        trace!("server: stop");
74                        break;
75                    }
76                }
77            }
78        });
79    }
80
81    pub fn stop(&self) {
82        self.quit_notify.notify_one();
83    }
84
85    pub fn on_session<F>(&self, callback: F)
86    where
87        F: Fn(Weak<TcpChannel>) + 'static,
88    {
89        *self.on_session.borrow_mut() = Some(Box::new(callback));
90    }
91}