simd_r_drive_ws_client/
ws_client.rs

1use muxio_rpc_service_caller::{RpcServiceCallerInterface, prebuffered::RpcCallPrebuffered};
2use muxio_tokio_rpc_client::{RpcClient, RpcTransportState};
3use simd_r_drive::{
4    DataStore, EntryMetadata,
5    traits::{AsyncDataStoreReader, AsyncDataStoreWriter},
6};
7use simd_r_drive_muxio_service_definition::prebuffered::{
8    BatchRead, BatchReadRequestParams, BatchWrite, BatchWriteRequestParams, Delete,
9    DeleteRequestParams, Exists, ExistsRequestParams, FileSize, FileSizeRequestParams, IsEmpty,
10    IsEmptyRequestParams, Len, LenRequestParams, Read, ReadRequestParams, Write,
11    WriteRequestParams,
12};
13use std::io::Result;
14
15pub struct WsClient {
16    rpc_client: RpcClient,
17}
18
19impl WsClient {
20    pub async fn new(host: &str, port: u16) -> Result<Self> {
21        let rpc_client = RpcClient::new(host, port).await?;
22
23        Ok(Self { rpc_client })
24    }
25
26    /// Sets a callback that will be invoked with the current `RpcTransportState`
27    /// whenever the WebSocket connection status changes.
28    pub fn set_state_change_handler(
29        &self,
30        handler: impl Fn(RpcTransportState) + Send + Sync + 'static,
31    ) {
32        self.rpc_client.set_state_change_handler(handler);
33    }
34}
35
36#[async_trait::async_trait]
37impl AsyncDataStoreWriter for WsClient {
38    async fn write_stream<R: std::io::Read>(&self, _key: &[u8], _reader: &mut R) -> Result<u64> {
39        unimplemented!("`write_stream` is not currently implemented");
40    }
41
42    async fn write_stream_with_key_hash<R: std::io::Read>(
43        &self,
44        _key_hash: u64,
45        _reader: &mut R,
46    ) -> Result<u64> {
47        unimplemented!("`write_stream_with_key_hash` is not currently implemented");
48    }
49
50    async fn write(&self, key: &[u8], payload: &[u8]) -> Result<u64> {
51        let response_params = Write::call(
52            &self.rpc_client,
53            WriteRequestParams {
54                key: key.to_vec(),
55                payload: payload.to_vec(),
56            },
57        )
58        .await?;
59
60        Ok(response_params.tail_offset)
61    }
62
63    async fn write_with_key_hash(&self, _key_hash: u64, _payload: &[u8]) -> Result<u64> {
64        unimplemented!("`write_with_key_hash` is not currently implemented");
65    }
66
67    async fn batch_write(&self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
68        let response_params = BatchWrite::call(
69            &self.rpc_client,
70            BatchWriteRequestParams {
71                entries: entries
72                    .iter()
73                    .map(|(k, v)| (k.to_vec(), v.to_vec()))
74                    .collect(),
75            },
76        )
77        .await?;
78
79        Ok(response_params.tail_offset)
80    }
81
82    async fn batch_write_with_key_hashes(
83        &self,
84        _prehashed_keys: Vec<(u64, &[u8])>,
85        _allow_null_bytes: bool,
86    ) -> Result<u64> {
87        unimplemented!("`batch_write_with_key_hashes` is not currently implemented");
88    }
89
90    async fn rename(&self, _old_key: &[u8], _new_key: &[u8]) -> Result<u64> {
91        unimplemented!("`rename` is not currently implemented");
92    }
93
94    async fn copy(&self, _key: &[u8], _target: &DataStore) -> Result<u64> {
95        unimplemented!("`copy` is not currently implemented");
96    }
97
98    async fn transfer(&self, _key: &[u8], _target: &DataStore) -> Result<u64> {
99        unimplemented!("`transfer` is not currently implemented");
100    }
101
102    async fn delete(&self, key: &[u8]) -> Result<u64> {
103        let resp =
104            Delete::call(&self.rpc_client, DeleteRequestParams { key: key.to_vec() }).await?;
105
106        Ok(resp.tail_offset)
107    }
108
109    async fn batch_delete(&self, _keys: &[&[u8]]) -> Result<u64> {
110        unimplemented!("`batch_delete` is not currently implemented");
111    }
112
113    async fn batch_delete_key_hashes(&self, _prehashed_keys: &[u64]) -> Result<u64> {
114        unimplemented!("`batch_delete_key_hashes` is not currently implemented");
115    }
116}
117
118#[async_trait::async_trait]
119impl AsyncDataStoreReader for WsClient {
120    // FIXME: This is a workaround until properly implementing a stream-able handle
121    type EntryHandleType = Vec<u8>;
122
123    async fn exists(&self, key: &[u8]) -> Result<bool> {
124        let response_params =
125            Exists::call(&self.rpc_client, ExistsRequestParams { key: key.to_vec() }).await?;
126
127        Ok(response_params.exists)
128    }
129
130    async fn exists_with_key_hash(&self, _prehashed_key: u64) -> Result<bool> {
131        unimplemented!("`exists_with_key_hash` is not currently implemented");
132    }
133
134    async fn read(&self, key: &[u8]) -> Result<Option<Self::EntryHandleType>> {
135        let response_params =
136            Read::call(&self.rpc_client, ReadRequestParams { key: key.to_vec() }).await?;
137
138        Ok(response_params.entry_payload)
139    }
140
141    async fn read_with_key_hash(
142        &self,
143        _prehashed_key: u64,
144    ) -> Result<Option<Self::EntryHandleType>> {
145        unimplemented!("`read_with_key_hash` is not currently implemented");
146    }
147
148    async fn read_last_entry(&self) -> Result<Option<Self::EntryHandleType>> {
149        unimplemented!("`read_last_entry` is not currently implemented");
150    }
151
152    async fn batch_read(&self, keys: &[&[u8]]) -> Result<Vec<Option<Self::EntryHandleType>>> {
153        let batch_read_result = BatchRead::call(
154            &self.rpc_client,
155            BatchReadRequestParams {
156                keys: keys.iter().map(|key| key.to_vec()).collect(),
157            },
158        )
159        .await?;
160
161        Ok(batch_read_result.entries_payloads)
162    }
163
164    async fn batch_read_hashed_keys(
165        &self,
166        _prehashed_keys: &[u64],
167        _non_hashed_keys: Option<&[&[u8]]>,
168    ) -> Result<Vec<Option<Self::EntryHandleType>>> {
169        unimplemented!("`batch_read_hashed_keys` is not currently implemented");
170    }
171
172    async fn read_metadata(&self, _key: &[u8]) -> Result<Option<EntryMetadata>> {
173        unimplemented!("`read_metadata` is not currently implemented");
174    }
175
176    async fn len(&self) -> Result<usize> {
177        let response_params = Len::call(&self.rpc_client, LenRequestParams {}).await?;
178
179        Ok(response_params.total_entries)
180    }
181
182    async fn is_empty(&self) -> Result<bool> {
183        let response_params = IsEmpty::call(&self.rpc_client, IsEmptyRequestParams {}).await?;
184
185        Ok(response_params.is_empty)
186    }
187
188    async fn file_size(&self) -> Result<u64> {
189        let response_params = FileSize::call(&self.rpc_client, FileSizeRequestParams {}).await?;
190
191        Ok(response_params.file_size)
192    }
193}