1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use rand::rngs::StdRng;
5use rand::Rng;
6use rand::SeedableRng;
7use tonic::codec::CompressionEncoding;
8use tonic::transport::Channel;
9use tracing::debug;
10use tracing::error;
11
12use super::ClientInner;
13use crate::proto::client::raft_client_service_client::RaftClientServiceClient;
14use crate::proto::client::ClientReadRequest;
15use crate::proto::client::ClientResult;
16use crate::proto::client::ClientWriteRequest;
17use crate::proto::client::WriteCommand;
18use crate::proto::error::ErrorCode;
19use crate::scoped_timer::ScopedTimer;
20use crate::ClientApiError;
21
22#[derive(Clone)]
27pub struct KvClient {
28 pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
29}
30
31impl KvClient {
32 pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
33 Self { client_inner }
34 }
35
36 pub async fn put(
42 &self,
43 key: impl AsRef<[u8]>,
44 value: impl AsRef<[u8]>,
45 ) -> std::result::Result<(), ClientApiError> {
46 let _timer = ScopedTimer::new("client::put");
47
48 let client_inner = self.client_inner.load();
49
50 let mut commands = Vec::new();
52 let client_command_insert = WriteCommand::insert(key, value);
53 commands.push(client_command_insert);
54
55 let request = ClientWriteRequest {
56 client_id: client_inner.client_id,
57 commands,
58 };
59
60 let mut client = self.make_leader_client().await?;
61 match client.handle_client_write(request).await {
63 Ok(response) => {
64 debug!("[:KvClient:write] response: {:?}", response);
65 let client_response = response.get_ref();
66 client_response.validate_error()
67 }
68 Err(status) => {
69 error!("[:KvClient:write] status: {:?}", status);
70 Err(status.into())
71 }
72 }
73 }
74
75 pub async fn delete(
87 &self,
88 key: impl AsRef<[u8]>,
89 ) -> std::result::Result<(), ClientApiError> {
90 let client_inner = self.client_inner.load();
91 let mut commands = Vec::new();
93 let client_command_insert = WriteCommand::delete(key);
94 commands.push(client_command_insert);
95
96 let request = ClientWriteRequest {
97 client_id: client_inner.client_id,
98 commands,
99 };
100
101 let mut client = self.make_leader_client().await?;
102
103 match client.handle_client_write(request).await {
105 Ok(response) => {
106 debug!("[:KvClient:delete] response: {:?}", response);
107 let client_response = response.get_ref();
108 client_response.validate_error()
109 }
110 Err(status) => {
111 error!("[:KvClient:delete] status: {:?}", status);
112 Err(status.into())
113 }
114 }
115 }
116
117 pub async fn get(
128 &self,
129 key: impl AsRef<[u8]>,
130 linear: bool,
131 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
132 let mut results = self.get_multi(std::iter::once(key), linear).await?;
134
135 Ok(results.pop().unwrap_or(None))
137 }
138 pub async fn get_multi(
151 &self,
152 keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
153 linear: bool,
154 ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
155 let client_inner = self.client_inner.load();
156 let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
158
159 if keys.is_empty() {
161 return Err(ErrorCode::InvalidRequest.into());
162 }
163
164 let mut client = if linear {
166 self.make_leader_client().await?
167 } else {
168 self.make_client().await?
169 };
170
171 let request = ClientReadRequest {
173 client_id: client_inner.client_id,
174 linear,
175 keys,
176 };
177
178 match client.handle_client_read(request).await {
180 Ok(response) => {
181 debug!("Read response: {:?}", response);
182 response.into_inner().into_read_results()
183 }
184 Err(status) => {
185 error!("Read request failed: {:?}", status);
186 Err(status.into())
187 }
188 }
189 }
190
191 async fn make_leader_client(
192 &self
193 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
194 let client_inner = self.client_inner.load();
195
196 let channel = client_inner.pool.get_leader();
197 let mut client = RaftClientServiceClient::new(channel);
198 if client_inner.pool.config.enable_compression {
199 client = client
200 .send_compressed(CompressionEncoding::Gzip)
201 .accept_compressed(CompressionEncoding::Gzip);
202 }
203
204 Ok(client)
205 }
206
207 pub(super) async fn make_client(
208 &self
209 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
210 let client_inner = self.client_inner.load();
211
212 let mut rng = StdRng::from_entropy();
214 let channels = client_inner.pool.get_all_channels();
215 let i = rng.gen_range(0..channels.len());
216
217 let mut client = RaftClientServiceClient::new(channels[i].clone());
218
219 if client_inner.pool.config.enable_compression {
220 client = client
221 .send_compressed(CompressionEncoding::Gzip)
222 .accept_compressed(CompressionEncoding::Gzip);
223 }
224
225 Ok(client)
226 }
227}