golang_ipc_rs/
server.rs

1use crate::{connection::Connection, handshake, message, Context, Message, Status};
2use anyhow::bail;
3use bytes::Bytes;
4use flume::SendError;
5use std::sync::{atomic::AtomicUsize, Arc};
6use tokio::sync::RwLock;
7
8#[derive(Default, Debug)]
9pub struct Server {
10    context: Context,
11}
12
13#[derive(Debug)]
14pub struct Config {
15    // The socket path to connect
16    pub path: String,
17
18    // Flag to indicate if encryption is required
19    pub encryption: bool,
20
21    // Esnforced max msg length
22    pub max_msg_length: usize,
23}
24
25impl Default for Config {
26    fn default() -> Self {
27        Config {
28            path: "".to_owned(),
29            encryption: true,
30            max_msg_length: crate::DEFAULT_MSG_LENGTH,
31        }
32    }
33}
34
35impl Server {
36    pub fn new(config: Config) -> Self {
37        Server {
38            context: Context {
39                path: Arc::new(config.path),
40                channel: None,
41                max_msg_length: Arc::new(AtomicUsize::new(config.max_msg_length)),
42                encryption: config.encryption,
43                status: Arc::new(RwLock::<Status>::new(Status::NotConnected)),
44                connection_timeout: 0, // Ignored by the server
45                retry_delay: 0,        // Ignored by the server
46            },
47        }
48    }
49
50    pub async fn run(&mut self) -> crate::Result<()> {
51        if self.context.get_status().await != Status::NotConnected {
52            bail!("Can not run twice!");
53        }
54
55        // TODO: add size to config
56        let (left, right) = crate::bichannel::channel::<Message, Message>(100);
57
58        // Clone the context.
59        //  One will stay with the Server struct, and the second will move into the async task.
60        let mut context = self.context.clone();
61
62        // Give each context the correct side of the channel.
63        self.context.set_channel(left);
64        context.set_channel(right);
65
66        tokio::spawn(async move {
67            let mut context: Context = context;
68            if let Err(err) = Server::run_server(&mut context).await {
69                context.report_error_status(err).await;
70            }
71        });
72
73        Ok(())
74    }
75
76    async fn run_server(context: &mut Context) -> crate::Result<()> {
77        context.report_status(Status::Listening).await?;
78
79        let mut listener = Connection::listen(context.path.as_ref()).await?;
80
81        loop {
82            let mut connection = match listener.next_connection().await? {
83                Some(connection) => connection,
84                None => break,
85            };
86
87            context.report_status(Status::Connected).await?;
88
89            if let Err(err) = handshake::server::start(&mut connection, context).await {
90                context.report_error_status(err).await;
91            }
92
93            if let Err(err) = message::message_loop(connection, context).await {
94                context.report_error_status(err).await;
95            }
96
97            context.report_status(Status::Listening).await?;
98        }
99
100        Ok(())
101    }
102
103    pub fn new_message(&self, msg_type: i32, data: Bytes) -> Message {
104        Message::new(msg_type, data, self.context.encryption)
105    }
106
107    pub async fn recv_message(&self) -> Option<Message> {
108        self.context.recv_message().await
109    }
110
111    pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
112        self.context.send_message(message).await
113    }
114}