ignite_rs/
cache.rs

1use std::convert::TryFrom;
2
3use crate::api::key_value::{
4    CacheBoolResp, CacheDataObjectResp, CachePairsResp, CacheReq, CacheSizeResp,
5};
6use crate::cache::AtomicityMode::{Atomic, Transactional};
7use crate::cache::CacheMode::{Local, Partitioned, Replicated};
8use crate::cache::IndexType::{Fulltext, GeoSpatial, Sorted};
9use crate::cache::PartitionLossPolicy::{
10    Ignore, ReadOnlyAll, ReadOnlySafe, ReadWriteAll, ReadWriteSafe,
11};
12use crate::cache::RebalanceMode::Async;
13use crate::cache::WriteSynchronizationMode::{FullAsync, FullSync, PrimarySync};
14use crate::error::{IgniteError, IgniteResult};
15
16use crate::api::OpCode;
17use crate::connection::Connection;
18use crate::{ReadableType, WritableType};
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22#[derive(Clone, Debug)]
23pub enum AtomicityMode {
24    Transactional = 0,
25    Atomic = 1,
26}
27
28impl TryFrom<i32> for AtomicityMode {
29    type Error = IgniteError;
30
31    fn try_from(value: i32) -> Result<Self, Self::Error> {
32        match value {
33            0 => Ok(Transactional),
34            1 => Ok(Atomic),
35            _ => Err(IgniteError::from("Cannot read AtomicityMode")),
36        }
37    }
38}
39
40#[derive(Clone, Debug)]
41pub enum CacheMode {
42    Local = 0,
43    Replicated = 1,
44    Partitioned = 2,
45}
46
47impl TryFrom<i32> for CacheMode {
48    type Error = IgniteError;
49
50    fn try_from(value: i32) -> Result<Self, Self::Error> {
51        match value {
52            0 => Ok(Local),
53            1 => Ok(Replicated),
54            2 => Ok(Partitioned),
55            _ => Err(IgniteError::from("Cannot read CacheMode")),
56        }
57    }
58}
59
60#[derive(Clone, Debug)]
61pub enum PartitionLossPolicy {
62    ReadOnlySafe = 0,
63    ReadOnlyAll = 1,
64    ReadWriteSafe = 2,
65    ReadWriteAll = 3,
66    Ignore = 4,
67}
68
69impl TryFrom<i32> for PartitionLossPolicy {
70    type Error = IgniteError;
71
72    fn try_from(value: i32) -> Result<Self, Self::Error> {
73        match value {
74            0 => Ok(ReadOnlySafe),
75            1 => Ok(ReadOnlyAll),
76            2 => Ok(ReadWriteSafe),
77            3 => Ok(ReadWriteAll),
78            4 => Ok(Ignore),
79            _ => Err(IgniteError::from("Cannot read PartitionLossPolicy")),
80        }
81    }
82}
83
84#[derive(Clone, Debug)]
85pub enum RebalanceMode {
86    Sync = 0,
87    Async = 1,
88    None = 2,
89}
90
91impl TryFrom<i32> for RebalanceMode {
92    type Error = IgniteError;
93
94    fn try_from(value: i32) -> Result<Self, Self::Error> {
95        match value {
96            0 => Ok(RebalanceMode::Sync),
97            1 => Ok(Async),
98            2 => Ok(RebalanceMode::None),
99            _ => Err(IgniteError::from("Cannot read RebalanceMode")),
100        }
101    }
102}
103
104#[derive(Clone, Debug)]
105pub enum WriteSynchronizationMode {
106    FullSync = 0,
107    FullAsync = 1,
108    PrimarySync = 2,
109}
110
111impl TryFrom<i32> for WriteSynchronizationMode {
112    type Error = IgniteError;
113
114    fn try_from(value: i32) -> Result<Self, Self::Error> {
115        match value {
116            0 => Ok(FullSync),
117            1 => Ok(FullAsync),
118            2 => Ok(PrimarySync),
119            _ => Err(IgniteError::from("Cannot read WriteSynchronizationMode")),
120        }
121    }
122}
123
124#[derive(Clone, Debug)]
125pub enum CachePeekMode {
126    All = 0,
127    Near = 1,
128    Primary = 2,
129    Backup = 3,
130}
131
132impl Into<u8> for CachePeekMode {
133    fn into(self) -> u8 {
134        self as u8
135    }
136}
137
138#[derive(Clone, Debug)]
139pub enum IndexType {
140    Sorted = 0,
141    Fulltext = 1,
142    GeoSpatial = 2,
143}
144
145impl TryFrom<u8> for IndexType {
146    type Error = IgniteError;
147
148    fn try_from(value: u8) -> Result<Self, Self::Error> {
149        match value {
150            0 => Ok(Sorted),
151            1 => Ok(Fulltext),
152            2 => Ok(GeoSpatial),
153            _ => Err(IgniteError::from("Cannot read IndexType")),
154        }
155    }
156}
157
158#[derive(Clone, Debug)]
159pub struct CacheConfiguration {
160    pub atomicity_mode: AtomicityMode,
161    pub num_backup: i32,
162    pub cache_mode: CacheMode,
163    pub copy_on_read: bool,
164    pub data_region_name: Option<String>,
165    pub eager_ttl: bool,
166    pub statistics_enabled: bool,
167    pub group_name: Option<String>,
168    pub default_lock_timeout_ms: i64,
169    pub max_concurrent_async_operations: i32,
170    pub max_query_iterators: i32,
171    pub name: String,
172    pub onheap_cache_enabled: bool,
173    pub partition_loss_policy: PartitionLossPolicy,
174    pub query_detail_metrics_size: i32,
175    pub query_parallelism: i32,
176    pub read_from_backup: bool,
177    pub rebalance_batch_size: i32,
178    pub rebalance_batches_prefetch_count: i64,
179    pub rebalance_delay_ms: i64,
180    pub rebalance_mode: RebalanceMode,
181    pub rebalance_order: i32,
182    pub rebalance_throttle_ms: i64,
183    pub rebalance_timeout_ms: i64,
184    pub sql_escape_all: bool,
185    pub sql_index_max_size: i32,
186    pub sql_schema: Option<String>,
187    pub write_synchronization_mode: WriteSynchronizationMode,
188    pub cache_key_configurations: Option<Vec<CacheKeyConfiguration>>,
189    pub query_entities: Option<Vec<QueryEntity>>,
190}
191
192impl CacheConfiguration {
193    pub fn new(name: &str) -> CacheConfiguration {
194        CacheConfiguration {
195            name: name.to_owned(),
196            ..Self::default()
197        }
198    }
199
200    fn default() -> CacheConfiguration {
201        CacheConfiguration {
202            atomicity_mode: AtomicityMode::Atomic,
203            num_backup: 0,
204            cache_mode: CacheMode::Partitioned,
205            copy_on_read: true,
206            data_region_name: None,
207            eager_ttl: true,
208            statistics_enabled: true,
209            group_name: None,
210            default_lock_timeout_ms: 0,
211            max_concurrent_async_operations: 500,
212            max_query_iterators: 1024,
213            name: String::new(),
214            onheap_cache_enabled: false,
215            partition_loss_policy: PartitionLossPolicy::Ignore,
216            query_detail_metrics_size: 0,
217            query_parallelism: 1,
218            read_from_backup: true,
219            rebalance_batch_size: 512 * 1024, //512K
220            rebalance_batches_prefetch_count: 2,
221            rebalance_delay_ms: 0,
222            rebalance_mode: RebalanceMode::Async,
223            rebalance_order: 0,
224            rebalance_throttle_ms: 0,
225            rebalance_timeout_ms: 10000, //1sec
226            sql_escape_all: false,
227            sql_index_max_size: -1,
228            sql_schema: None,
229            write_synchronization_mode: WriteSynchronizationMode::PrimarySync,
230            cache_key_configurations: None,
231            query_entities: None,
232        }
233    }
234}
235
236#[derive(Clone, Debug)]
237pub struct CacheKeyConfiguration {
238    pub type_name: String,
239    pub affinity_key_field_name: String,
240}
241
242#[derive(Clone, Debug)]
243pub struct QueryEntity {
244    pub(crate) key_type: String,
245    pub(crate) value_type: String,
246    pub(crate) table: String,
247    pub(crate) key_field: String,
248    pub(crate) value_field: String,
249    pub(crate) query_fields: Vec<QueryField>,
250    pub(crate) field_aliases: Vec<(String, String)>,
251    pub(crate) query_indexes: Vec<QueryIndex>,
252    pub(crate) default_value: Option<String>, //TODO: find the issue where this field is listed
253}
254
255#[derive(Clone, Debug)]
256pub struct QueryField {
257    pub(crate) name: String,
258    pub(crate) type_name: String,
259    pub(crate) key_field: bool,
260    pub(crate) not_null_constraint: bool,
261}
262
263#[derive(Clone, Debug)]
264pub struct QueryIndex {
265    pub(crate) index_name: String,
266    pub(crate) index_type: IndexType,
267    pub(crate) inline_size: i32,
268    pub(crate) fields: Vec<(String, bool)>,
269}
270
271/// Ignite key-value cache. This cache is strongly typed and reading/writing some other
272/// types leads to errors.
273/// All caches created from the single IgniteClient shares the common TCP connection
274pub struct Cache<K: WritableType + ReadableType, V: WritableType + ReadableType> {
275    id: i32,
276    pub _name: String,
277    conn: Arc<Connection>,
278    k_phantom: PhantomData<K>,
279    v_phantom: PhantomData<V>,
280}
281
282impl<K: WritableType + ReadableType, V: WritableType + ReadableType> Cache<K, V> {
283    pub(crate) fn new(id: i32, name: String, conn: Arc<Connection>) -> Cache<K, V> {
284        Cache {
285            id,
286            _name: name,
287            conn,
288            k_phantom: PhantomData,
289            v_phantom: PhantomData,
290        }
291    }
292
293    pub fn get(&self, key: &K) -> IgniteResult<Option<V>> {
294        self.conn
295            .send_and_read(OpCode::CacheGet, CacheReq::Get::<K, V>(self.id, key))
296            .map(|resp: CacheDataObjectResp<V>| resp.val)
297    }
298
299    pub fn get_all(&self, keys: &[K]) -> IgniteResult<Vec<(Option<K>, Option<V>)>> {
300        self.conn
301            .send_and_read(OpCode::CacheGetAll, CacheReq::GetAll::<K, V>(self.id, keys))
302            .map(|resp: CachePairsResp<K, V>| resp.val)
303    }
304
305    pub fn put(&self, key: &K, value: &V) -> IgniteResult<()> {
306        self.conn
307            .send(OpCode::CachePut, CacheReq::Put::<K, V>(self.id, key, value))
308    }
309
310    pub fn put_all(&self, pairs: &[(K, V)]) -> IgniteResult<()> {
311        self.conn.send(
312            OpCode::CachePutAll,
313            CacheReq::PutAll::<K, V>(self.id, pairs),
314        )
315    }
316
317    pub fn contains_key(&self, key: &K) -> IgniteResult<bool> {
318        self.conn
319            .send_and_read(
320                OpCode::CacheContainsKey,
321                CacheReq::ContainsKey::<K, V>(self.id, key),
322            )
323            .map(|resp: CacheBoolResp| resp.flag)
324    }
325
326    pub fn contains_keys(&self, keys: &[K]) -> IgniteResult<bool> {
327        self.conn
328            .send_and_read(
329                OpCode::CacheContainsKeys,
330                CacheReq::ContainsKeys::<K, V>(self.id, keys),
331            )
332            .map(|resp: CacheBoolResp| resp.flag)
333    }
334
335    pub fn get_and_put(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
336        self.conn
337            .send_and_read(
338                OpCode::CacheGetAndPut,
339                CacheReq::GetAndPut::<K, V>(self.id, key, value),
340            )
341            .map(|resp: CacheDataObjectResp<V>| resp.val)
342    }
343
344    pub fn get_and_replace(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
345        self.conn
346            .send_and_read(
347                OpCode::CacheGetAndReplace,
348                CacheReq::GetAndReplace::<K, V>(self.id, key, value),
349            )
350            .map(|resp: CacheDataObjectResp<V>| resp.val)
351    }
352
353    pub fn get_and_remove(&self, key: &K) -> IgniteResult<Option<V>> {
354        self.conn
355            .send_and_read(
356                OpCode::CacheGetAndRemove,
357                CacheReq::GetAndRemove::<K, V>(self.id, key),
358            )
359            .map(|resp: CacheDataObjectResp<V>| resp.val)
360    }
361
362    pub fn put_if_absent(&self, key: &K, value: &V) -> IgniteResult<bool> {
363        self.conn
364            .send_and_read(
365                OpCode::CachePutIfAbsent,
366                CacheReq::PutIfAbsent::<K, V>(self.id, key, value),
367            )
368            .map(|resp: CacheBoolResp| resp.flag)
369    }
370
371    pub fn get_and_put_if_absent(&self, key: &K, value: &V) -> IgniteResult<Option<V>> {
372        self.conn
373            .send_and_read(
374                OpCode::CacheGetAndPutIfAbsent,
375                CacheReq::GetAndPutIfAbsent::<K, V>(self.id, key, value),
376            )
377            .map(|resp: CacheDataObjectResp<V>| resp.val)
378    }
379
380    pub fn replace(&self, key: &K, value: &V) -> IgniteResult<bool> {
381        self.conn
382            .send_and_read(
383                OpCode::CacheReplace,
384                CacheReq::Replace::<K, V>(self.id, key, value),
385            )
386            .map(|resp: CacheBoolResp| resp.flag)
387    }
388
389    pub fn replace_if_equals(&self, key: &K, old: &V, new: &V) -> IgniteResult<bool> {
390        self.conn
391            .send_and_read(
392                OpCode::CacheReplaceIfEquals,
393                CacheReq::ReplaceIfEquals::<K, V>(self.id, key, old, new),
394            )
395            .map(|resp: CacheBoolResp| resp.flag)
396    }
397
398    pub fn clear(&self) -> IgniteResult<()> {
399        self.conn
400            .send(OpCode::CacheClear, CacheReq::Clear::<K, V>(self.id))
401    }
402
403    pub fn clear_key(&self, key: &K) -> IgniteResult<()> {
404        self.conn.send(
405            OpCode::CacheClearKey,
406            CacheReq::ClearKey::<K, V>(self.id, key),
407        )
408    }
409
410    pub fn clear_keys(&self, keys: &[K]) -> IgniteResult<()> {
411        self.conn.send(
412            OpCode::CacheClearKeys,
413            CacheReq::ClearKeys::<K, V>(self.id, keys),
414        )
415    }
416
417    pub fn remove_key(&self, key: &K) -> IgniteResult<bool> {
418        self.conn
419            .send_and_read(
420                OpCode::CacheRemoveKey,
421                CacheReq::RemoveKey::<K, V>(self.id, key),
422            )
423            .map(|resp: CacheBoolResp| resp.flag)
424    }
425
426    pub fn remove_if_equals(&self, key: &K, value: &V) -> IgniteResult<bool> {
427        self.conn
428            .send_and_read(
429                OpCode::CacheRemoveIfEquals,
430                CacheReq::RemoveIfEquals::<K, V>(self.id, key, value),
431            )
432            .map(|resp: CacheBoolResp| resp.flag)
433    }
434
435    pub fn get_size(&self) -> IgniteResult<i64> {
436        let modes = Vec::new();
437        self.conn
438            .send_and_read(
439                OpCode::CacheGetSize,
440                CacheReq::GetSize::<K, V>(self.id, modes),
441            )
442            .map(|resp: CacheSizeResp| resp.size)
443    }
444
445    pub fn get_size_peek_mode(&self, mode: CachePeekMode) -> IgniteResult<i64> {
446        let modes = vec![mode];
447        self.conn
448            .send_and_read(
449                OpCode::CacheGetSize,
450                CacheReq::GetSize::<K, V>(self.id, modes),
451            )
452            .map(|resp: CacheSizeResp| resp.size)
453    }
454
455    pub fn get_size_peek_modes(&self, modes: Vec<CachePeekMode>) -> IgniteResult<i64> {
456        self.conn
457            .send_and_read(
458                OpCode::CacheGetSize,
459                CacheReq::GetSize::<K, V>(self.id, modes),
460            )
461            .map(|resp: CacheSizeResp| resp.size)
462    }
463
464    pub fn remove_keys(&self, keys: &[K]) -> IgniteResult<()> {
465        self.conn.send(
466            OpCode::CacheRemoveKeys,
467            CacheReq::RemoveKeys::<K, V>(self.id, keys),
468        )
469    }
470
471    pub fn remove_all(&self) -> IgniteResult<()> {
472        self.conn
473            .send(OpCode::CacheRemoveAll, CacheReq::RemoveAll::<K, V>(self.id))
474    }
475}