simd_r_drive_ws_client/
ws_client.rs1use muxio_rpc_service_caller::{RpcServiceCallerInterface, prebuffered::RpcCallPrebuffered};
2use muxio_tokio_rpc_client::{RpcClient, RpcTransportState};
3use simd_r_drive::{
4 DataStore, EntryMetadata,
5 traits::{AsyncDataStoreReader, AsyncDataStoreWriter},
6};
7use simd_r_drive_muxio_service_definition::prebuffered::{
8 BatchRead, BatchReadRequestParams, BatchWrite, BatchWriteRequestParams, Delete,
9 DeleteRequestParams, Exists, ExistsRequestParams, FileSize, FileSizeRequestParams, IsEmpty,
10 IsEmptyRequestParams, Len, LenRequestParams, Read, ReadRequestParams, Write,
11 WriteRequestParams,
12};
13use std::io::Result;
14
15pub struct WsClient {
16 rpc_client: RpcClient,
17}
18
19impl WsClient {
20 pub async fn new(host: &str, port: u16) -> Result<Self> {
21 let rpc_client = RpcClient::new(host, port).await?;
22
23 Ok(Self { rpc_client })
24 }
25
26 pub fn set_state_change_handler(
29 &self,
30 handler: impl Fn(RpcTransportState) + Send + Sync + 'static,
31 ) {
32 self.rpc_client.set_state_change_handler(handler);
33 }
34}
35
36#[async_trait::async_trait]
37impl AsyncDataStoreWriter for WsClient {
38 async fn write_stream<R: std::io::Read>(&self, _key: &[u8], _reader: &mut R) -> Result<u64> {
39 unimplemented!("`write_stream` is not currently implemented");
40 }
41
42 async fn write_stream_with_key_hash<R: std::io::Read>(
43 &self,
44 _key_hash: u64,
45 _reader: &mut R,
46 ) -> Result<u64> {
47 unimplemented!("`write_stream_with_key_hash` is not currently implemented");
48 }
49
50 async fn write(&self, key: &[u8], payload: &[u8]) -> Result<u64> {
51 let response_params = Write::call(
52 &self.rpc_client,
53 WriteRequestParams {
54 key: key.to_vec(),
55 payload: payload.to_vec(),
56 },
57 )
58 .await?;
59
60 Ok(response_params.tail_offset)
61 }
62
63 async fn write_with_key_hash(&self, _key_hash: u64, _payload: &[u8]) -> Result<u64> {
64 unimplemented!("`write_with_key_hash` is not currently implemented");
65 }
66
67 async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
68 let response_params = BatchWrite::call(
69 &self.rpc_client,
70 BatchWriteRequestParams {
71 entries: entries
72 .iter()
73 .map(|(k, v)| (k.to_vec(), v.to_vec()))
74 .collect(),
75 },
76 )
77 .await?;
78
79 Ok(response_params.tail_offset)
80 }
81
82 async fn batch_write_with_key_hashes(
83 &self,
84 _prehashed_keys: Vec<(u64, &[u8])>,
85 _allow_null_bytes: bool,
86 ) -> Result<u64> {
87 unimplemented!("`batch_write_with_key_hashes` is not currently implemented");
88 }
89
90 async fn rename(&self, _old_key: &[u8], _new_key: &[u8]) -> Result<u64> {
91 unimplemented!("`rename` is not currently implemented");
92 }
93
94 async fn copy(&self, _key: &[u8], _target: &DataStore) -> Result<u64> {
95 unimplemented!("`copy` is not currently implemented");
96 }
97
98 async fn transfer(&self, _key: &[u8], _target: &DataStore) -> Result<u64> {
99 unimplemented!("`transfer` is not currently implemented");
100 }
101
102 async fn delete(&self, key: &[u8]) -> Result<u64> {
103 let resp =
104 Delete::call(&self.rpc_client, DeleteRequestParams { key: key.to_vec() }).await?;
105
106 Ok(resp.tail_offset)
107 }
108
109 async fn batch_delete(&self, _keys: &[&[u8]]) -> Result<u64> {
110 unimplemented!("`batch_delete` is not currently implemented");
111 }
112
113 async fn batch_delete_key_hashes(&self, _prehashed_keys: &[u64]) -> Result<u64> {
114 unimplemented!("`batch_delete_key_hashes` is not currently implemented");
115 }
116}
117
118#[async_trait::async_trait]
119impl AsyncDataStoreReader for WsClient {
120 type EntryHandleType = Vec<u8>;
122
123 async fn exists(&self, key: &[u8]) -> Result<bool> {
124 let response_params =
125 Exists::call(&self.rpc_client, ExistsRequestParams { key: key.to_vec() }).await?;
126
127 Ok(response_params.exists)
128 }
129
130 async fn exists_with_key_hash(&self, _prehashed_key: u64) -> Result<bool> {
131 unimplemented!("`exists_with_key_hash` is not currently implemented");
132 }
133
134 async fn read(&self, key: &[u8]) -> Result<Option<Self::EntryHandleType>> {
135 let response_params =
136 Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?;
137
138 Ok(response_params.entry_payload)
139 }
140
141 async fn read_with_key_hash(
142 &self,
143 _prehashed_key: u64,
144 ) -> Result<Option<Self::EntryHandleType>> {
145 unimplemented!("`read_with_key_hash` is not currently implemented");
146 }
147
148 async fn read_last_entry(&self) -> Result<Option<Self::EntryHandleType>> {
149 unimplemented!("`read_last_entry` is not currently implemented");
150 }
151
152 async fn batch_read(&self, keys: &[&[u8]]) -> Result<Vec<Option<Self::EntryHandleType>>> {
153 let batch_read_result = BatchRead::call(
154 &self.rpc_client,
155 BatchReadRequestParams {
156 keys: keys.iter().map(|key| key.to_vec()).collect(),
157 },
158 )
159 .await?;
160
161 Ok(batch_read_result.entries_payloads)
162 }
163
164 async fn batch_read_hashed_keys(
165 &self,
166 _prehashed_keys: &[u64],
167 _non_hashed_keys: Option<&[&[u8]]>,
168 ) -> Result<Vec<Option<Self::EntryHandleType>>> {
169 unimplemented!("`batch_read_hashed_keys` is not currently implemented");
170 }
171
172 async fn read_metadata(&self, _key: &[u8]) -> Result<Option<EntryMetadata>> {
173 unimplemented!("`read_metadata` is not currently implemented");
174 }
175
176 async fn len(&self) -> Result<usize> {
177 let response_params = Len::call(&self.rpc_client, LenRequestParams {}).await?;
178
179 Ok(response_params.total_entries)
180 }
181
182 async fn is_empty(&self) -> Result<bool> {
183 let response_params = IsEmpty::call(&self.rpc_client, IsEmptyRequestParams {}).await?;
184
185 Ok(response_params.is_empty)
186 }
187
188 async fn file_size(&self) -> Result<u64> {
189 let response_params = FileSize::call(&self.rpc_client, FileSizeRequestParams {}).await?;
190
191 Ok(response_params.file_size)
192 }
193}