rpc_core_net/
tcp_server.rs

1use std::cell::RefCell;
2use std::net::SocketAddr;
3use std::rc::{Rc, Weak};
4
5use log::{debug, trace};
6use tokio::net::TcpListener;
7use tokio::select;
8use tokio::sync::Notify;
9
10use crate::config::TcpConfig;
11use crate::detail::tcp_channel::TcpChannel;
12
13pub struct TcpServer {
14    port: RefCell<u16>,
15    config: Rc<RefCell<TcpConfig>>,
16    on_session: RefCell<Option<Box<dyn Fn(Weak<TcpChannel>)>>>,
17    quit_notify: Notify,
18    this: RefCell<Weak<Self>>,
19}
20
21// public
22impl TcpServer {
23    pub fn new(port: u16, config: TcpConfig) -> Rc<Self> {
24        let config = Rc::new(RefCell::new(config));
25        let r = Rc::new(Self {
26            port: port.into(),
27            config: config.clone(),
28            on_session: None.into(),
29            quit_notify: Notify::new(),
30            this: Weak::new().into(),
31        });
32        let this_weak = Rc::downgrade(&r);
33        *r.this.borrow_mut() = this_weak.clone().into();
34        r
35    }
36
37    pub fn downgrade(&self) -> Weak<Self> {
38        self.this.borrow().clone()
39    }
40
41    pub fn start(&self) {
42        self.config.borrow_mut().init();
43        let port = *self.port.borrow();
44
45        let this_weak = self.this.borrow().clone();
46
47        tokio::task::spawn_local(async move {
48            debug!("listen: {port}");
49            let listener = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), port)).await.unwrap();
50            loop {
51                let this = this_weak.upgrade().unwrap();
52                select! {
53                    res = listener.accept() => {
54                        match res {
55                            Ok((stream, addr))=> {
56                                debug!("accept addr: {addr}");
57                                tokio::task::spawn_local(async move {
58                                    let session = TcpChannel::new(this.config.clone());
59                                    if let Some(on_session) = this.on_session.borrow_mut().as_ref() {
60                                        session.do_open(stream);
61                                        on_session(Rc::downgrade(&session));
62                                    }
63                                });
64                            },
65                            Err(e) => {
66                                println!("Error accepting connection: {}", e);
67                            }
68                        }
69                    }
70                    _ = this.quit_notify.notified() => {
71                        trace!("server: stop");
72                        break;
73                    }
74                }
75            }
76        });
77    }
78
79    pub fn stop(&self) {
80        self.quit_notify.notify_one();
81    }
82
83    pub fn on_session<F>(&self, callback: F)
84        where F: Fn(Weak<TcpChannel>) + 'static,
85    {
86        *self.on_session.borrow_mut() = Some(Box::new(callback));
87    }
88}
89