golang_ipc_rs/
client.rs

1use crate::{connection::Connection, handshake, message, Context, Message, Status};
2use anyhow::bail;
3use bytes::Bytes;
4use flume::SendError;
5use std::fmt::{Debug, Formatter};
6use std::sync::{atomic::AtomicUsize, Arc};
7use tokio::sync::RwLock;
8
9pub struct Client {
10    context: Context,
11}
12
13#[derive(Debug)]
14pub struct Config {
15    // The socket path to connect
16    pub path: String,
17
18    // Flag to indicate if encryption is required
19    pub encryption: bool,
20
21    // Seconds to wait before failing to connect
22    //  A value of `0` waits forever.
23    pub connection_timeout: u64,
24
25    // Number of seconds to wait between connection attempts
26    //  Must be non-zero
27    pub retry_delay: u64,
28
29    // Initial max msg length
30    //  This will be overridden by the server.  The only impact here is for any messages created
31    //  before the initial connection.
32    pub max_msg_length: usize,
33}
34
35impl Default for Config {
36    fn default() -> Self {
37        Config {
38            path: "".to_owned(),
39            encryption: true,
40            connection_timeout: crate::DEFAULT_CONNECTION_TIMEOUT,
41            retry_delay: crate::DEFAULT_RETRY_DELAY,
42            max_msg_length: crate::DEFAULT_MSG_LENGTH,
43        }
44    }
45}
46
47impl Client {
48    pub fn new(config: Config) -> Self {
49        Client {
50            context: Context {
51                path: Arc::new(config.path),
52                channel: None,
53                max_msg_length: Arc::new(AtomicUsize::new(config.max_msg_length)),
54                encryption: config.encryption,
55                status: Arc::new(RwLock::<Status>::new(Status::NotConnected)),
56                connection_timeout: config.connection_timeout,
57                retry_delay: config.retry_delay,
58            },
59        }
60    }
61
62    pub async fn run(&mut self) -> crate::Result<()> {
63        if self.context.get_status().await != Status::NotConnected {
64            bail!("Can not run twice!");
65        }
66
67        let (left, right) = crate::bichannel::channel::<Message, Message>(100);
68
69        // Clone the context.
70        //  One will stay with the Server strut, and the second will move into the async task.
71        let mut context = self.context.clone();
72
73        // Give each context the correct side of the channel.
74        self.context.set_channel(left);
75        context.set_channel(right);
76
77        tokio::spawn(async move {
78            let mut context: Context = context;
79            if let Err(err) = context.run().await {
80                context.report_error_status(err).await;
81            }
82        });
83
84        Ok(())
85    }
86
87    pub fn new_message(&self, msg_type: i32, data: Bytes) -> Message {
88        Message::new(msg_type, data, self.context.encryption)
89    }
90
91    pub async fn recv_message(&self) -> Option<Message> {
92        self.context.recv_message().await
93    }
94
95    pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
96        self.context.send_message(message).await
97    }
98
99    pub async fn get_status(&mut self) -> Status {
100        self.context.get_status().await
101    }
102}
103
104impl Context {
105    async fn run(&mut self) -> crate::Result<()> {
106        self.report_status(Status::Connecting).await?;
107
108        loop {
109            let mut connection = Connection::connect(
110                self.path.as_ref(),
111                self.connection_timeout,
112                self.retry_delay,
113            )
114            .await?;
115
116            self.report_status(Status::Connected).await?;
117
118            handshake::client::start(&mut connection, self).await?;
119
120            message::message_loop(connection, self).await?;
121
122            // If the remote disconnected, we will attempt to connect again
123            if self.get_status().await != Status::ReConnecting {
124                break;
125            }
126        }
127
128        self.report_status(Status::Closed).await?;
129
130        Ok(())
131    }
132}
133
134impl Debug for Client {
135    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
136        write!(f, "Client")
137    }
138}