1use crate::{connection::Connection, handshake, message, Context, Message, Status};
2use anyhow::bail;
3use bytes::Bytes;
4use flume::SendError;
5use std::fmt::{Debug, Formatter};
6use std::sync::{atomic::AtomicUsize, Arc};
7use tokio::sync::RwLock;
8
9pub struct Client {
10 context: Context,
11}
12
13#[derive(Debug)]
14pub struct Config {
15 pub path: String,
17
18 pub encryption: bool,
20
21 pub connection_timeout: u64,
24
25 pub retry_delay: u64,
28
29 pub max_msg_length: usize,
33}
34
35impl Default for Config {
36 fn default() -> Self {
37 Config {
38 path: "".to_owned(),
39 encryption: true,
40 connection_timeout: crate::DEFAULT_CONNECTION_TIMEOUT,
41 retry_delay: crate::DEFAULT_RETRY_DELAY,
42 max_msg_length: crate::DEFAULT_MSG_LENGTH,
43 }
44 }
45}
46
47impl Client {
48 pub fn new(config: Config) -> Self {
49 Client {
50 context: Context {
51 path: Arc::new(config.path),
52 channel: None,
53 max_msg_length: Arc::new(AtomicUsize::new(config.max_msg_length)),
54 encryption: config.encryption,
55 status: Arc::new(RwLock::<Status>::new(Status::NotConnected)),
56 connection_timeout: config.connection_timeout,
57 retry_delay: config.retry_delay,
58 },
59 }
60 }
61
62 pub async fn run(&mut self) -> crate::Result<()> {
63 if self.context.get_status().await != Status::NotConnected {
64 bail!("Can not run twice!");
65 }
66
67 let (left, right) = crate::bichannel::channel::<Message, Message>(100);
68
69 let mut context = self.context.clone();
72
73 self.context.set_channel(left);
75 context.set_channel(right);
76
77 tokio::spawn(async move {
78 let mut context: Context = context;
79 if let Err(err) = context.run().await {
80 context.report_error_status(err).await;
81 }
82 });
83
84 Ok(())
85 }
86
87 pub fn new_message(&self, msg_type: i32, data: Bytes) -> Message {
88 Message::new(msg_type, data, self.context.encryption)
89 }
90
91 pub async fn recv_message(&self) -> Option<Message> {
92 self.context.recv_message().await
93 }
94
95 pub async fn send_message(&self, message: Message) -> Result<(), SendError<Message>> {
96 self.context.send_message(message).await
97 }
98
99 pub async fn get_status(&mut self) -> Status {
100 self.context.get_status().await
101 }
102}
103
104impl Context {
105 async fn run(&mut self) -> crate::Result<()> {
106 self.report_status(Status::Connecting).await?;
107
108 loop {
109 let mut connection = Connection::connect(
110 self.path.as_ref(),
111 self.connection_timeout,
112 self.retry_delay,
113 )
114 .await?;
115
116 self.report_status(Status::Connected).await?;
117
118 handshake::client::start(&mut connection, self).await?;
119
120 message::message_loop(connection, self).await?;
121
122 if self.get_status().await != Status::ReConnecting {
124 break;
125 }
126 }
127
128 self.report_status(Status::Closed).await?;
129
130 Ok(())
131 }
132}
133
134impl Debug for Client {
135 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
136 write!(f, "Client")
137 }
138}