1use std::convert::TryFrom;
2
3use crate::api::key_value::{
4 CacheBoolResp, CacheDataObjectResp, CachePairsResp, CacheReq, CacheSizeResp,
5};
6use crate::cache::AtomicityMode::{Atomic, Transactional};
7use crate::cache::CacheMode::{Local, Partitioned, Replicated};
8use crate::cache::IndexType::{Fulltext, GeoSpatial, Sorted};
9use crate::cache::PartitionLossPolicy::{
10 Ignore, ReadOnlyAll, ReadOnlySafe, ReadWriteAll, ReadWriteSafe,
11};
12use crate::cache::RebalanceMode::Async;
13use crate::cache::WriteSynchronizationMode::{FullAsync, FullSync, PrimarySync};
14use crate::error::{IgniteError, IgniteResult};
15
16use crate::api::OpCode;
17use crate::connection::Connection;
18use crate::{ReadableType, WritableType};
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22#[derive(Clone, Debug)]
23pub enum AtomicityMode {
24 Transactional = 0,
25 Atomic = 1,
26}
27
28impl TryFrom<i32> for AtomicityMode {
29 type Error = IgniteError;
30
31 fn try_from(value: i32) -> Result<Self, Self::Error> {
32 match value {
33 0 => Ok(Transactional),
34 1 => Ok(Atomic),
35 _ => Err(IgniteError::from("Cannot read AtomicityMode")),
36 }
37 }
38}
39
40#[derive(Clone, Debug)]
41pub enum CacheMode {
42 Local = 0,
43 Replicated = 1,
44 Partitioned = 2,
45}
46
47impl TryFrom<i32> for CacheMode {
48 type Error = IgniteError;
49
50 fn try_from(value: i32) -> Result<Self, Self::Error> {
51 match value {
52 0 => Ok(Local),
53 1 => Ok(Replicated),
54 2 => Ok(Partitioned),
55 _ => Err(IgniteError::from("Cannot read CacheMode")),
56 }
57 }
58}
59
60#[derive(Clone, Debug)]
61pub enum PartitionLossPolicy {
62 ReadOnlySafe = 0,
63 ReadOnlyAll = 1,
64 ReadWriteSafe = 2,
65 ReadWriteAll = 3,
66 Ignore = 4,
67}
68
69impl TryFrom<i32> for PartitionLossPolicy {
70 type Error = IgniteError;
71
72 fn try_from(value: i32) -> Result<Self, Self::Error> {
73 match value {
74 0 => Ok(ReadOnlySafe),
75 1 => Ok(ReadOnlyAll),
76 2 => Ok(ReadWriteSafe),
77 3 => Ok(ReadWriteAll),
78 4 => Ok(Ignore),
79 _ => Err(IgniteError::from("Cannot read PartitionLossPolicy")),
80 }
81 }
82}
83
84#[derive(Clone, Debug)]
85pub enum RebalanceMode {
86 Sync = 0,
87 Async = 1,
88 None = 2,
89}
90
91impl TryFrom<i32> for RebalanceMode {
92 type Error = IgniteError;
93
94 fn try_from(value: i32) -> Result<Self, Self::Error> {
95 match value {
96 0 => Ok(RebalanceMode::Sync),
97 1 => Ok(Async),
98 2 => Ok(RebalanceMode::None),
99 _ => Err(IgniteError::from("Cannot read RebalanceMode")),
100 }
101 }
102}
103
104#[derive(Clone, Debug)]
105pub enum WriteSynchronizationMode {
106 FullSync = 0,
107 FullAsync = 1,
108 PrimarySync = 2,
109}
110
111impl TryFrom<i32> for WriteSynchronizationMode {
112 type Error = IgniteError;
113
114 fn try_from(value: i32) -> Result<Self, Self::Error> {
115 match value {
116 0 => Ok(FullSync),
117 1 => Ok(FullAsync),
118 2 => Ok(PrimarySync),
119 _ => Err(IgniteError::from("Cannot read WriteSynchronizationMode")),
120 }
121 }
122}
123
124#[derive(Clone, Debug)]
125pub enum CachePeekMode {
126 All = 0,
127 Near = 1,
128 Primary = 2,
129 Backup = 3,
130}
131
132impl Into<u8> for CachePeekMode {
133 fn into(self) -> u8 {
134 self as u8
135 }
136}
137
138#[derive(Clone, Debug)]
139pub enum IndexType {
140 Sorted = 0,
141 Fulltext = 1,
142 GeoSpatial = 2,
143}
144
145impl TryFrom<u8> for IndexType {
146 type Error = IgniteError;
147
148 fn try_from(value: u8) -> Result<Self, Self::Error> {
149 match value {
150 0 => Ok(Sorted),
151 1 => Ok(Fulltext),
152 2 => Ok(GeoSpatial),
153 _ => Err(IgniteError::from("Cannot read IndexType")),
154 }
155 }
156}
157
158#[derive(Clone, Debug)]
159pub struct CacheConfiguration {
160 pub atomicity_mode: AtomicityMode,
161 pub num_backup: i32,
162 pub cache_mode: CacheMode,
163 pub copy_on_read: bool,
164 pub data_region_name: Option<String>,
165 pub eager_ttl: bool,
166 pub statistics_enabled: bool,
167 pub group_name: Option<String>,
168 pub default_lock_timeout_ms: i64,
169 pub max_concurrent_async_operations: i32,
170 pub max_query_iterators: i32,
171 pub name: String,
172 pub onheap_cache_enabled: bool,
173 pub partition_loss_policy: PartitionLossPolicy,
174 pub query_detail_metrics_size: i32,
175 pub query_parallelism: i32,
176 pub read_from_backup: bool,
177 pub rebalance_batch_size: i32,
178 pub rebalance_batches_prefetch_count: i64,
179 pub rebalance_delay_ms: i64,
180 pub rebalance_mode: RebalanceMode,
181 pub rebalance_order: i32,
182 pub rebalance_throttle_ms: i64,
183 pub rebalance_timeout_ms: i64,
184 pub sql_escape_all: bool,
185 pub sql_index_max_size: i32,
186 pub sql_schema: Option<String>,
187 pub write_synchronization_mode: WriteSynchronizationMode,
188 pub cache_key_configurations: Option<Vec<CacheKeyConfiguration>>,
189 pub query_entities: Option<Vec<QueryEntity>>,
190}
191
192impl CacheConfiguration {
193 pub fn new(name: &str) -> CacheConfiguration {
194 CacheConfiguration {
195 name: name.to_owned(),
196 ..Self::default()
197 }
198 }
199
200 fn default() -> CacheConfiguration {
201 CacheConfiguration {
202 atomicity_mode: AtomicityMode::Atomic,
203 num_backup: 0,
204 cache_mode: CacheMode::Partitioned,
205 copy_on_read: true,
206 data_region_name: None,
207 eager_ttl: true,
208 statistics_enabled: true,
209 group_name: None,
210 default_lock_timeout_ms: 0,
211 max_concurrent_async_operations: 500,
212 max_query_iterators: 1024,
213 name: String::new(),
214 onheap_cache_enabled: false,
215 partition_loss_policy: PartitionLossPolicy::Ignore,
216 query_detail_metrics_size: 0,
217 query_parallelism: 1,
218 read_from_backup: true,
219 rebalance_batch_size: 512 * 1024, rebalance_batches_prefetch_count: 2,
221 rebalance_delay_ms: 0,
222 rebalance_mode: RebalanceMode::Async,
223 rebalance_order: 0,
224 rebalance_throttle_ms: 0,
225 rebalance_timeout_ms: 10000, sql_escape_all: false,
227 sql_index_max_size: -1,
228 sql_schema: None,
229 write_synchronization_mode: WriteSynchronizationMode::PrimarySync,
230 cache_key_configurations: None,
231 query_entities: None,
232 }
233 }
234}
235
236#[derive(Clone, Debug)]
237pub struct CacheKeyConfiguration {
238 pub type_name: String,
239 pub affinity_key_field_name: String,
240}
241
242#[derive(Clone, Debug)]
243pub struct QueryEntity {
244 pub(crate) key_type: String,
245 pub(crate) value_type: String,
246 pub(crate) table: String,
247 pub(crate) key_field: String,
248 pub(crate) value_field: String,
249 pub(crate) query_fields: Vec<QueryField>,
250 pub(crate) field_aliases: Vec<(String, String)>,
251 pub(crate) query_indexes: Vec<QueryIndex>,
252 pub(crate) default_value: Option<String>, }
254
255#[derive(Clone, Debug)]
256pub struct QueryField {
257 pub(crate) name: String,
258 pub(crate) type_name: String,
259 pub(crate) key_field: bool,
260 pub(crate) not_null_constraint: bool,
261}
262
263#[derive(Clone, Debug)]
264pub struct QueryIndex {
265 pub(crate) index_name: String,
266 pub(crate) index_type: IndexType,
267 pub(crate) inline_size: i32,
268 pub(crate) fields: Vec<(String, bool)>,
269}
270
271pub struct Cache<K: WritableType + ReadableType, V: WritableType + ReadableType> {
275 id: i32,
276 pub _name: String,
277 conn: Arc<Connection>,
278 k_phantom: PhantomData<K>,
279 v_phantom: PhantomData<V>,
280}
281
282impl<K: WritableType + ReadableType, V: WritableType + ReadableType> Cache<K, V> {
283 pub(crate) fn new(id: i32, name: String, conn: Arc<Connection>) -> Cache<K, V> {
284 Cache {
285 id,
286 _name: name,
287 conn,
288 k_phantom: PhantomData,
289 v_phantom: PhantomData,
290 }
291 }
292
293 pub fn get(&self, key: &K) -> IgniteResult<Option<V>> {
294 self.conn
295 .send_and_read(OpCode::CacheGet, CacheReq::Get::<K, V>(self.id, key))
296 .map(|resp: CacheDataObjectResp<V>| resp.val)
297 }
298
299 pub fn get_all(&self, keys: &[K]) -> IgniteResult<Vec<(Option<K>, Option<V>)>> {
300 self.conn
301 .send_and_read(OpCode::CacheGetAll, CacheReq::GetAll::<K, V>(self.id, keys))
302 .map(|resp: CachePairsResp<K, V>| resp.val)
303 }
304
305 pub fn put(&self, key: &K, value: &V) -> IgniteResult<()> {
306 self.conn
307 .send(OpCode::CachePut, CacheReq::Put::<K, V>(self.id, key, value))
308 }
309
310 pub fn put_all(&self, pairs: &[(K, V)]) -> IgniteResult<()> {
311 self.conn.send(
312 OpCode::CachePutAll,
313 CacheReq::PutAll::<K, V>(self.id, pairs),
314 )
315 }
316
317 pub fn contains_key(&self, key: &K) -> IgniteResult<bool> {
318 self.conn
319 .send_and_read(
320 OpCode::CacheContainsKey,
321 CacheReq::ContainsKey::<K, V>(self.id, key),
322 )
323 .map(|resp: CacheBoolResp| resp.flag)
324 }
325
326 pub fn contains_keys(&self, keys: &[K]) -> IgniteResult<bool> {
327 self.conn
328 .send_and_read(
329 OpCode::CacheContainsKeys,
330 CacheReq::ContainsKeys::<K, V>(self.id, keys),
331 )
332 .map(|resp: CacheBoolResp| resp.flag)
333 }
334
335 pub fn get_and_put(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
336 self.conn
337 .send_and_read(
338 OpCode::CacheGetAndPut,
339 CacheReq::GetAndPut::<K, V>(self.id, key, value),
340 )
341 .map(|resp: CacheDataObjectResp<V>| resp.val)
342 }
343
344 pub fn get_and_replace(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
345 self.conn
346 .send_and_read(
347 OpCode::CacheGetAndReplace,
348 CacheReq::GetAndReplace::<K, V>(self.id, key, value),
349 )
350 .map(|resp: CacheDataObjectResp<V>| resp.val)
351 }
352
353 pub fn get_and_remove(&self, key: &K) -> IgniteResult<Option<V>> {
354 self.conn
355 .send_and_read(
356 OpCode::CacheGetAndRemove,
357 CacheReq::GetAndRemove::<K, V>(self.id, key),
358 )
359 .map(|resp: CacheDataObjectResp<V>| resp.val)
360 }
361
362 pub fn put_if_absent(&self, key: &K, value: &V) -> IgniteResult<bool> {
363 self.conn
364 .send_and_read(
365 OpCode::CachePutIfAbsent,
366 CacheReq::PutIfAbsent::<K, V>(self.id, key, value),
367 )
368 .map(|resp: CacheBoolResp| resp.flag)
369 }
370
371 pub fn get_and_put_if_absent(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
372 self.conn
373 .send_and_read(
374 OpCode::CacheGetAndPutIfAbsent,
375 CacheReq::GetAndPutIfAbsent::<K, V>(self.id, key, value),
376 )
377 .map(|resp: CacheDataObjectResp<V>| resp.val)
378 }
379
380 pub fn replace(&self, key: &K, value: &V) -> IgniteResult<bool> {
381 self.conn
382 .send_and_read(
383 OpCode::CacheReplace,
384 CacheReq::Replace::<K, V>(self.id, key, value),
385 )
386 .map(|resp: CacheBoolResp| resp.flag)
387 }
388
389 pub fn replace_if_equals(&self, key: &K, old: &V, new: &V) -> IgniteResult<bool> {
390 self.conn
391 .send_and_read(
392 OpCode::CacheReplaceIfEquals,
393 CacheReq::ReplaceIfEquals::<K, V>(self.id, key, old, new),
394 )
395 .map(|resp: CacheBoolResp| resp.flag)
396 }
397
398 pub fn clear(&self) -> IgniteResult<()> {
399 self.conn
400 .send(OpCode::CacheClear, CacheReq::Clear::<K, V>(self.id))
401 }
402
403 pub fn clear_key(&self, key: &K) -> IgniteResult<()> {
404 self.conn.send(
405 OpCode::CacheClearKey,
406 CacheReq::ClearKey::<K, V>(self.id, key),
407 )
408 }
409
410 pub fn clear_keys(&self, keys: &[K]) -> IgniteResult<()> {
411 self.conn.send(
412 OpCode::CacheClearKeys,
413 CacheReq::ClearKeys::<K, V>(self.id, keys),
414 )
415 }
416
417 pub fn remove_key(&self, key: &K) -> IgniteResult<bool> {
418 self.conn
419 .send_and_read(
420 OpCode::CacheRemoveKey,
421 CacheReq::RemoveKey::<K, V>(self.id, key),
422 )
423 .map(|resp: CacheBoolResp| resp.flag)
424 }
425
426 pub fn remove_if_equals(&self, key: &K, value: &V) -> IgniteResult<bool> {
427 self.conn
428 .send_and_read(
429 OpCode::CacheRemoveIfEquals,
430 CacheReq::RemoveIfEquals::<K, V>(self.id, key, value),
431 )
432 .map(|resp: CacheBoolResp| resp.flag)
433 }
434
435 pub fn get_size(&self) -> IgniteResult<i64> {
436 let modes = Vec::new();
437 self.conn
438 .send_and_read(
439 OpCode::CacheGetSize,
440 CacheReq::GetSize::<K, V>(self.id, modes),
441 )
442 .map(|resp: CacheSizeResp| resp.size)
443 }
444
445 pub fn get_size_peek_mode(&self, mode: CachePeekMode) -> IgniteResult<i64> {
446 let modes = vec![mode];
447 self.conn
448 .send_and_read(
449 OpCode::CacheGetSize,
450 CacheReq::GetSize::<K, V>(self.id, modes),
451 )
452 .map(|resp: CacheSizeResp| resp.size)
453 }
454
455 pub fn get_size_peek_modes(&self, modes: Vec<CachePeekMode>) -> IgniteResult<i64> {
456 self.conn
457 .send_and_read(
458 OpCode::CacheGetSize,
459 CacheReq::GetSize::<K, V>(self.id, modes),
460 )
461 .map(|resp: CacheSizeResp| resp.size)
462 }
463
464 pub fn remove_keys(&self, keys: &[K]) -> IgniteResult<()> {
465 self.conn.send(
466 OpCode::CacheRemoveKeys,
467 CacheReq::RemoveKeys::<K, V>(self.id, keys),
468 )
469 }
470
471 pub fn remove_all(&self) -> IgniteResult<()> {
472 self.conn
473 .send(OpCode::CacheRemoveAll, CacheReq::RemoveAll::<K, V>(self.id))
474 }
475}