1use crate::{connection::Connection, handshake, message, Context, Message, Status};
2use anyhow::bail;
3use bytes::Bytes;
4use flume::SendError;
5use std::sync::{atomic::AtomicUsize, Arc};
6use tokio::sync::RwLock;
7
8#[derive(Default, Debug)]
9pub struct Server {
10 context: Context,
11}
12
13#[derive(Debug)]
14pub struct Config {
15 pub path: String,
17
18 pub encryption: bool,
20
21 pub max_msg_length: usize,
23}
24
25impl Default for Config {
26 fn default() -> Self {
27 Config {
28 path: "".to_owned(),
29 encryption: true,
30 max_msg_length: crate::DEFAULT_MSG_LENGTH,
31 }
32 }
33}
34
35impl Server {
36 pub fn new(config: Config) -> Self {
37 Server {
38 context: Context {
39 path: Arc::new(config.path),
40 channel: None,
41 max_msg_length: Arc::new(AtomicUsize::new(config.max_msg_length)),
42 encryption: config.encryption,
43 status: Arc::new(RwLock::<Status>::new(Status::NotConnected)),
44 connection_timeout: 0, retry_delay: 0, },
47 }
48 }
49
50 pub async fn run(&mut self) -> crate::Result<()> {
51 if self.context.get_status().await != Status::NotConnected {
52 bail!("Can not run twice!");
53 }
54
55 let (left, right) = crate::bichannel::channel::<Message, Message>(100);
57
58 let mut context = self.context.clone();
61
62 self.context.set_channel(left);
64 context.set_channel(right);
65
66 tokio::spawn(async move {
67 let mut context: Context = context;
68 if let Err(err) = Server::run_server(&mut context).await {
69 context.report_error_status(err).await;
70 }
71 });
72
73 Ok(())
74 }
75
76 async fn run_server(context: &mut Context) -> crate::Result<()> {
77 context.report_status(Status::Listening).await?;
78
79 let mut listener = Connection::listen(context.path.as_ref()).await?;
80
81 loop {
82 let mut connection = match listener.next_connection().await? {
83 Some(connection) => connection,
84 None => break,
85 };
86
87 context.report_status(Status::Connected).await?;
88
89 if let Err(err) = handshake::server::start(&mut connection, context).await {
90 context.report_error_status(err).await;
91 }
92
93 if let Err(err) = message::message_loop(connection, context).await {
94 context.report_error_status(err).await;
95 }
96
97 context.report_status(Status::Listening).await?;
98 }
99
100 Ok(())
101 }
102
103 pub fn new_message(&self, msg_type: i32, data: Bytes) -> Message {
104 Message::new(msg_type, data, self.context.encryption)
105 }
106
107 pub async fn recv_message(&self) -> Option<Message> {
108 self.context.recv_message().await
109 }
110
111 pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
112 self.context.send_message(message).await
113 }
114}