pravega_client/sync/
table.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10
11use crate::client_factory::ClientFactoryAsync;
12use crate::segment::raw_client::{RawClient, RawClientError};
13use crate::util::get_request_id;
14
15use pravega_client_auth::DelegationTokenProvider;
16use pravega_client_retry::retry_async::retry_async;
17use pravega_client_retry::retry_result::RetryResult;
18use pravega_client_shared::{PravegaNodeUri, Stream as PravegaStream};
19use pravega_client_shared::{Scope, ScopedSegment, ScopedStream, Segment};
20use pravega_wire_protocol::commands::{
21    CreateTableSegmentCommand, DeleteTableSegmentCommand, ReadTableCommand, ReadTableEntriesCommand,
22    ReadTableEntriesDeltaCommand, ReadTableKeysCommand, RemoveTableKeysCommand, TableEntries, TableKey,
23    TableValue, UpdateTableEntriesCommand,
24};
25use pravega_wire_protocol::wire_commands::{Replies, Requests};
26
27use async_stream::try_stream;
28use futures::stream::Stream;
29use serde::Serialize;
30use serde_cbor::from_slice;
31use serde_cbor::to_vec;
32use snafu::Snafu;
33use tracing::{debug, info};
34
35pub type Version = i64;
36
37const KVTABLE_SUFFIX: &str = "_kvtable";
38
39#[derive(Debug, Snafu)]
40pub enum TableError {
41    #[snafu(display("Connection error while performing {}: {}", operation, source))]
42    ConnectionError {
43        can_retry: bool,
44        operation: String,
45        source: RawClientError,
46    },
47    #[snafu(display("Key does not exist while performing {}: {}", operation, error_msg))]
48    KeyDoesNotExist { operation: String, error_msg: String },
49    #[snafu(display("Table {} does not exist while performing {}", name, operation))]
50    TableDoesNotExist { operation: String, name: String },
51    #[snafu(display(
52        "Incorrect Key version observed while performing {}: {}",
53        operation,
54        error_msg
55    ))]
56    IncorrectKeyVersion { operation: String, error_msg: String },
57    #[snafu(display("Error observed while performing {} due to {}", operation, error_msg,))]
58    OperationError { operation: String, error_msg: String },
59}
60
61/// Table is the client implementation of Table Segment in Pravega.
62/// Table Segment is a key-value table based on Pravega segment.
63///
64/// # Examples
65/// ```ignore
66/// let map = client_factory.create_table(scope, "table".into()).await;
67/// let k: String = "key".into();
68/// let v: String = "val".into();
69/// let result = map.insert(&k, &v, -1).await;
70/// assert!(result.is_ok());
71/// let result: Result<Option<(String, Version)>, TableError> = map.get(&k).await;
72/// assert!(result.is_ok());
73/// ```
74pub struct Table {
75    // name should be unique as it is used to construct the internal stream.
76    // different table with same name will share the same state.
77    name: String,
78    endpoint: PravegaNodeUri,
79    factory: ClientFactoryAsync,
80    delegation_token_provider: DelegationTokenProvider,
81}
82
83impl Table {
84    ///
85    /// Method that is used to delete a Table Segment
86    ///
87    pub(crate) async fn delete(
88        scope: Scope,
89        name: String,
90        factory: ClientFactoryAsync,
91    ) -> Result<(), TableError> {
92        let segment = ScopedSegment {
93            scope,
94            stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
95            segment: Segment::from(0),
96        };
97        info!("deleting table map on {:?}", segment);
98
99        let delegation_token_provider = factory
100            .create_delegation_token_provider(ScopedStream::from(&segment))
101            .await;
102        let op = "Delete table segment";
103        retry_async(factory.config().retry_policy, || {
104            delete_table_segment(&factory, &segment, &delegation_token_provider)
105        })
106        .await
107        .map_err(|e| TableError::ConnectionError {
108            can_retry: true,
109            operation: op.to_string(),
110            source: e.error,
111        })
112        .and_then(|r| match r {
113            Replies::SegmentDeleted(..) | Replies::NoSuchSegment(..) => {
114                info!("Table segment {:?} deleted", segment);
115                Ok(())
116            }
117            _ => Err(TableError::OperationError {
118                operation: op.to_string(),
119                error_msg: r.to_string(),
120            }),
121        })
122    }
123
124    pub(crate) async fn new(
125        scope: Scope,
126        name: String,
127        factory: ClientFactoryAsync,
128    ) -> Result<Table, TableError> {
129        let segment = ScopedSegment {
130            scope,
131            stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
132            segment: Segment::from(0),
133        };
134        info!("creating table map on {:?}", segment);
135
136        let delegation_token_provider = factory
137            .create_delegation_token_provider(ScopedStream::from(&segment))
138            .await;
139
140        let op = "Create table segment";
141        retry_async(factory.config().retry_policy, || async {
142            let req = Requests::CreateTableSegment(CreateTableSegmentCommand {
143                request_id: get_request_id(),
144                segment: segment.to_string(),
145                delegation_token: delegation_token_provider
146                    .retrieve_token(factory.controller_client())
147                    .await,
148            });
149
150            let endpoint = factory
151                .controller_client()
152                .get_endpoint_for_segment(&segment)
153                .await
154                .expect("get endpoint for segment");
155            debug!("endpoint is {:?}", endpoint);
156
157            let result = factory
158                .create_raw_client_for_endpoint(endpoint.clone())
159                .send_request(&req)
160                .await;
161            match result {
162                Ok(reply) => RetryResult::Success((reply, endpoint)),
163                Err(e) => {
164                    if e.is_token_expired() {
165                        delegation_token_provider.signal_token_expiry();
166                        debug!("auth token needs to refresh");
167                    }
168                    debug!("retry on error {:?}", e);
169                    RetryResult::Retry(e)
170                }
171            }
172        })
173        .await
174        .map_err(|e| TableError::ConnectionError {
175            can_retry: true,
176            operation: op.to_string(),
177            source: e.error,
178        })
179        .and_then(|(r, endpoint)| match r {
180            Replies::SegmentCreated(..) | Replies::SegmentAlreadyExists(..) => {
181                info!("Table segment {:?} created", segment);
182                let table_map = Table {
183                    name: segment.to_string(),
184                    endpoint,
185                    factory,
186                    delegation_token_provider,
187                };
188                Ok(table_map)
189            }
190            _ => Err(TableError::OperationError {
191                operation: op.to_string(),
192                error_msg: r.to_string(),
193            }),
194        })
195    }
196
197    /// Return the latest value corresponding to the key.
198    ///
199    /// If the map does not have the key [`None`] is returned. The version number of the Value is
200    /// returned by the API.
201    pub async fn get<K, V>(&self, k: &K) -> Result<Option<(V, Version)>, TableError>
202    where
203        K: Serialize + serde::de::DeserializeOwned,
204        V: Serialize + serde::de::DeserializeOwned,
205    {
206        let key = to_vec(k).expect("error during serialization.");
207        let read_result = self.get_raw_values(vec![key]).await;
208        read_result.map(|v| {
209            let (l, version) = &v[0];
210            if l.is_empty() {
211                None
212            } else {
213                let value: V = from_slice(l.as_slice()).expect("error during deserialization");
214                Some((value, *version))
215            }
216        })
217    }
218
219    /// Unconditionally insert a new or update an existing entry for the given key.
220    /// Once the update is performed the newer version is returned.
221    pub async fn insert<K, V>(&self, k: &K, v: &V, offset: i64) -> Result<Version, TableError>
222    where
223        K: Serialize + serde::de::DeserializeOwned,
224        V: Serialize + serde::de::DeserializeOwned,
225    {
226        // use KEY_NO_VERSION to ensure unconditional update.
227        self.insert_conditionally(k, v, TableKey::KEY_NO_VERSION, offset)
228            .await
229    }
230
231    /// Conditionally insert a key-value pair into the table map. The Key and Value are serialized to bytes using
232    /// cbor.
233    ///
234    /// The insert is performed after checking the key_version passed.
235    /// Once the update is done the newer version is returned.
236    /// TableError::BadKeyVersion is returned in case of an incorrect key version.
237    pub async fn insert_conditionally<K, V>(
238        &self,
239        k: &K,
240        v: &V,
241        key_version: Version,
242        offset: i64,
243    ) -> Result<Version, TableError>
244    where
245        K: Serialize + serde::de::DeserializeOwned,
246        V: Serialize + serde::de::DeserializeOwned,
247    {
248        let key = to_vec(k).expect("error during serialization.");
249        let val = to_vec(v).expect("error during serialization.");
250        self.insert_raw_values(vec![(key, val, key_version)], offset)
251            .await
252            .map(|versions| versions[0])
253    }
254
255    /// Unconditionally remove a key from the Table. If the key does not exist an Ok(()) is returned.
256    pub async fn remove<K: Serialize + serde::de::DeserializeOwned>(
257        &self,
258        k: &K,
259        offset: i64,
260    ) -> Result<(), TableError> {
261        self.remove_conditionally(k, TableKey::KEY_NO_VERSION, offset)
262            .await
263    }
264
265    /// Conditionally remove a key from the Table if it matches the provided key version.
266    /// TableError::BadKeyVersion is returned in case the version does not exist.
267    pub async fn remove_conditionally<K>(
268        &self,
269        k: &K,
270        key_version: Version,
271        offset: i64,
272    ) -> Result<(), TableError>
273    where
274        K: Serialize + serde::de::DeserializeOwned,
275    {
276        let key = to_vec(k).expect("error during serialization.");
277        self.remove_raw_values(vec![(key, key_version)], offset).await
278    }
279
280    /// Return the latest values for a given list of keys. If the table does not have a
281    /// key a `None` is returned for the corresponding key. The version number of the Value is also
282    /// returned by the API
283    pub async fn get_all<K, V>(&self, keys: Vec<&K>) -> Result<Vec<Option<(V, Version)>>, TableError>
284    where
285        K: Serialize + serde::de::DeserializeOwned,
286        V: Serialize + serde::de::DeserializeOwned,
287    {
288        let keys_raw: Vec<Vec<u8>> = keys
289            .iter()
290            .map(|k| to_vec(*k).expect("error during serialization."))
291            .collect();
292
293        let read_result: Result<Vec<(Vec<u8>, Version)>, TableError> = self.get_raw_values(keys_raw).await;
294        read_result.map(|v| {
295            v.iter()
296                .map(|(data, version)| {
297                    if data.is_empty() {
298                        None
299                    } else {
300                        let value: V = from_slice(data.as_slice()).expect("error during deserialization");
301                        Some((value, *version))
302                    }
303                })
304                .collect()
305        })
306    }
307
308    /// Unconditionally insert a new or updates an existing entry for the given keys.
309    /// Once the update is performed the newer versions are returned.
310    pub async fn insert_all<K, V>(&self, kvps: Vec<(&K, &V)>, offset: i64) -> Result<Vec<Version>, TableError>
311    where
312        K: Serialize + serde::de::DeserializeOwned,
313        V: Serialize + serde::de::DeserializeOwned,
314    {
315        let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
316            .iter()
317            .map(|(k, v)| {
318                (
319                    to_vec(k).expect("error during serialization."),
320                    to_vec(v).expect("error during serialization."),
321                    TableKey::KEY_NO_VERSION,
322                )
323            })
324            .collect();
325        self.insert_raw_values(r, offset).await
326    }
327
328    /// Conditionally insert key-value pairs into the table. The Key and Value are serialized to to bytes using
329    /// cbor
330    ///
331    /// The insert is performed after checking the key_version passed, in case of a failure none of the key-value pairs
332    /// are persisted.
333    /// Once the update is done the newer version is returned.
334    /// TableError::BadKeyVersion is returned in case of an incorrect key version.
335    pub async fn insert_conditionally_all<K, V>(
336        &self,
337        kvps: Vec<(&K, &V, Version)>,
338        offset: i64,
339    ) -> Result<Vec<Version>, TableError>
340    where
341        K: Serialize + serde::de::DeserializeOwned,
342        V: Serialize + serde::de::DeserializeOwned,
343    {
344        let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
345            .iter()
346            .map(|(k, v, ver)| {
347                (
348                    to_vec(k).expect("error during serialization."),
349                    to_vec(v).expect("error during serialization."),
350                    *ver,
351                )
352            })
353            .collect();
354        self.insert_raw_values(r, offset).await
355    }
356
357    /// Unconditionally remove the provided keys from the table.
358    pub async fn remove_all<K>(&self, keys: Vec<&K>, offset: i64) -> Result<(), TableError>
359    where
360        K: Serialize + serde::de::DeserializeOwned,
361    {
362        let r: Vec<(&K, Version)> = keys.iter().map(|k| (*k, TableKey::KEY_NO_VERSION)).collect();
363        self.remove_conditionally_all(r, offset).await
364    }
365
366    /// Conditionally remove keys after checking the key version. In case of a failure none of the keys
367    /// are removed.
368    pub async fn remove_conditionally_all<K>(
369        &self,
370        keys: Vec<(&K, Version)>,
371        offset: i64,
372    ) -> Result<(), TableError>
373    where
374        K: Serialize + serde::de::DeserializeOwned,
375    {
376        let r: Vec<(Vec<u8>, Version)> = keys
377            .iter()
378            .map(|(k, v)| (to_vec(k).expect("error during serialization."), *v))
379            .collect();
380        self.remove_raw_values(r, offset).await
381    }
382
383    /// Read keys as an Async Stream. This method deserializes the Key based on the type.
384    pub fn read_keys_stream<'stream, 'map: 'stream, K: 'stream>(
385        &'map self,
386        max_keys_at_once: i32,
387    ) -> impl Stream<Item = Result<(K, Version), TableError>> + 'stream
388    where
389        K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
390    {
391        try_stream! {
392            let mut token: Vec<u8> = Vec::new();
393            loop {
394                let res: (Vec<(Vec<u8>, Version)>, Vec<u8>) = self.read_keys_raw(max_keys_at_once, &token).await?;
395                let (keys, t) = res;
396                if keys.is_empty() {
397                    break;
398                } else {
399                    for (key_raw, version) in keys {
400                       let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
401                        yield (key, version)
402                    }
403                    token = t;
404                }
405             }
406        }
407    }
408
409    /// Read entries as an Async Stream. This method deserialized the Key and Value based on the
410    /// inferred type.
411    pub fn read_entries_stream<'stream, 'map: 'stream, K: 'map, V: 'map>(
412        &'map self,
413        max_entries_at_once: i32,
414    ) -> impl Stream<Item = Result<(K, V, Version), TableError>> + 'stream
415    where
416        K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
417        V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
418    {
419        try_stream! {
420            let mut token: Vec<u8> = Vec::new();
421            loop {
422                let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, Vec<u8>)  = self.read_entries_raw(max_entries_at_once, &token).await?;
423                let (entries, t) = res;
424                if entries.is_empty() {
425                    break;
426                } else {
427                    for (key_raw, value_raw, version) in entries {
428                        let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
429                        let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
430                        yield (key, value, version)
431                    }
432                    token = t;
433                }
434            }
435        }
436    }
437
438    /// Read entries as an Async Stream from a given position. This method deserialized the Key and Value based on the
439    /// inferred type.
440    pub fn read_entries_stream_from_position<'stream, 'map: 'stream, K: 'map, V: 'map>(
441        &'map self,
442        max_entries_at_once: i32,
443        mut from_position: i64,
444    ) -> impl Stream<Item = Result<(K, V, Version, i64), TableError>> + 'stream
445    where
446        K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
447        V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
448    {
449        try_stream! {
450            loop {
451                let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, i64)  = self.read_entries_raw_delta(max_entries_at_once, from_position).await?;
452                let (entries, last_position) = res;
453                if entries.is_empty() {
454                    break;
455                } else {
456                    for (key_raw, value_raw, version) in entries {
457                        let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
458                        let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
459                        yield (key, value, version, last_position)
460                    }
461                    from_position = last_position;
462                }
463            }
464        }
465    }
466
467    /// Get a list of keys in the table map for a given continuation token.
468    /// It returns a Vector of Key with its version and a continuation token that can be used to
469    /// fetch the next set of keys.An empty Vector as the continuation token will result in the keys
470    /// being fetched from the beginning.
471    async fn get_keys<K>(
472        &self,
473        max_keys_at_once: i32,
474        token: &[u8],
475    ) -> Result<(Vec<(K, Version)>, Vec<u8>), TableError>
476    where
477        K: Serialize + serde::de::DeserializeOwned,
478    {
479        let res = self.read_keys_raw(max_keys_at_once, token).await;
480        res.map(|(keys, token)| {
481            let keys_de: Vec<(K, Version)> = keys
482                .iter()
483                .map(|(k, version)| {
484                    let key: K = from_slice(k.as_slice()).expect("error during deserialization");
485                    (key, *version)
486                })
487                .collect();
488            (keys_de, token)
489        })
490    }
491
492    /// Get a list of entries in the table map for a given continuation token.
493    /// It returns a Vector of Key with its version and a continuation token that can be used to
494    /// fetch the next set of entries. An empty Vector as the continuation token will result in the keys
495    /// being fetched from the beginning.
496    async fn get_entries<K, V>(
497        &self,
498        max_entries_at_once: i32,
499        token: &[u8],
500    ) -> Result<(Vec<(K, V, Version)>, Vec<u8>), TableError>
501    where
502        K: Serialize + serde::de::DeserializeOwned,
503        V: Serialize + serde::de::DeserializeOwned,
504    {
505        let res = self.read_entries_raw(max_entries_at_once, token).await;
506        res.map(|(entries, token)| {
507            let entries_de: Vec<(K, V, Version)> = entries
508                .iter()
509                .map(|(k, v, version)| {
510                    let key: K = from_slice(k.as_slice()).expect("error during deserialization");
511                    let value: V = from_slice(v.as_slice()).expect("error during deserialization");
512                    (key, value, *version)
513                })
514                .collect();
515            (entries_de, token)
516        })
517    }
518
519    /// Get a list of entries in the table from a given position.
520    async fn get_entries_delta<K, V>(
521        &self,
522        max_entries_at_once: i32,
523        from_position: i64,
524    ) -> Result<(Vec<(K, V, Version)>, i64), TableError>
525    where
526        K: Serialize + serde::de::DeserializeOwned,
527        V: Serialize + serde::de::DeserializeOwned,
528    {
529        let res = self
530            .read_entries_raw_delta(max_entries_at_once, from_position)
531            .await;
532        res.map(|(entries, token)| {
533            let entries_de: Vec<(K, V, Version)> = entries
534                .iter()
535                .map(|(k, v, version)| {
536                    let key: K = from_slice(k.as_slice()).expect("error during deserialization");
537                    let value: V = from_slice(v.as_slice()).expect("error during deserialization");
538                    (key, value, *version)
539                })
540                .collect();
541            (entries_de, token)
542        })
543    }
544
545    /// Insert key value pairs without serialization.
546    /// The function returns the newer version number post the insert operation.
547    async fn insert_raw_values(
548        &self,
549        kvps: Vec<(Vec<u8>, Vec<u8>, Version)>,
550        offset: i64,
551    ) -> Result<Vec<Version>, TableError> {
552        let op = "Insert into tablemap";
553
554        retry_async(self.factory.config().retry_policy, || async {
555            let entries: Vec<(TableKey, TableValue)> = kvps
556                .iter()
557                .map(|(k, v, ver)| {
558                    let tk = TableKey::new(k.clone(), *ver);
559                    let tv = TableValue::new(v.clone());
560                    (tk, tv)
561                })
562                .collect();
563            let te = TableEntries { entries };
564
565            let req = Requests::UpdateTableEntries(UpdateTableEntriesCommand {
566                request_id: get_request_id(),
567                segment: self.name.clone(),
568                delegation_token: self
569                    .delegation_token_provider
570                    .retrieve_token(self.factory.controller_client())
571                    .await,
572                table_entries: te,
573                table_segment_offset: offset,
574            });
575            let result = self
576                .factory
577                .create_raw_client_for_endpoint(self.endpoint.clone())
578                .send_request(&req)
579                .await;
580            match result {
581                Ok(reply) => RetryResult::Success(reply),
582                Err(e) => {
583                    if e.is_token_expired() {
584                        self.delegation_token_provider.signal_token_expiry();
585                        info!("auth token needs to refresh");
586                    }
587                    info!("Table insert retry error {:?}", e);
588                    RetryResult::Retry(e)
589                }
590            }
591        })
592        .await
593        .map_err(|e| TableError::ConnectionError {
594            can_retry: true,
595            operation: op.into(),
596            source: e.error,
597        })
598        .and_then(|r| match r {
599            Replies::TableEntriesUpdated(c) => Ok(c.updated_versions),
600            Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
601                operation: op.into(),
602                error_msg: c.to_string(),
603            }),
604            // unexpected response from Segment store causes a panic.
605            _ => Err(TableError::OperationError {
606                operation: op.into(),
607                error_msg: r.to_string(),
608            }),
609        })
610    }
611
612    /// Get raw bytes for a given Key. If no value is present then None is returned.
613    /// The read result and the corresponding version is returned as a tuple.
614    async fn get_raw_values(&self, keys: Vec<Vec<u8>>) -> Result<Vec<(Vec<u8>, Version)>, TableError> {
615        let op = "Read from tablemap";
616
617        retry_async(self.factory.config().retry_policy, || async {
618            let table_keys: Vec<TableKey> = keys
619                .iter()
620                .map(|k| TableKey::new(k.clone(), TableKey::KEY_NO_VERSION))
621                .collect();
622
623            let req = Requests::ReadTable(ReadTableCommand {
624                request_id: get_request_id(),
625                segment: self.name.clone(),
626                delegation_token: self
627                    .delegation_token_provider
628                    .retrieve_token(self.factory.controller_client())
629                    .await,
630                keys: table_keys,
631            });
632            let result = self
633                .factory
634                .create_raw_client_for_endpoint(self.endpoint.clone())
635                .send_request(&req)
636                .await;
637            debug!("Read Response {:?}", result);
638            match result {
639                Ok(reply) => RetryResult::Success(reply),
640                Err(e) => {
641                    if e.is_token_expired() {
642                        self.delegation_token_provider.signal_token_expiry();
643                        info!("auth token needs to refresh");
644                    }
645                    RetryResult::Retry(e)
646                }
647            }
648        })
649        .await
650        .map_err(|e| TableError::ConnectionError {
651            can_retry: true,
652            operation: op.into(),
653            source: e.error,
654        })
655        .and_then(|reply| match reply {
656            Replies::TableRead(c) => {
657                let v: Vec<(TableKey, TableValue)> = c.entries.entries;
658                if v.is_empty() {
659                    // partial response from Segment store causes a panic.
660                    panic!("Invalid response from the Segment store");
661                } else {
662                    //fetch value and corresponding version.
663                    let result: Vec<(Vec<u8>, Version)> =
664                        v.iter().map(|(l, r)| (r.data.clone(), l.key_version)).collect();
665                    Ok(result)
666                }
667            }
668            _ => Err(TableError::OperationError {
669                operation: op.into(),
670                error_msg: reply.to_string(),
671            }),
672        })
673    }
674
675    /// Remove a list of keys where the key, represented in raw bytes, and version of the corresponding
676    /// keys is specified.
677    async fn remove_raw_values(&self, keys: Vec<(Vec<u8>, Version)>, offset: i64) -> Result<(), TableError> {
678        let op = "Remove keys from table";
679
680        retry_async(self.factory.config().retry_policy, || async {
681            let tks: Vec<TableKey> = keys
682                .iter()
683                .map(|(k, ver)| TableKey::new(k.clone(), *ver))
684                .collect();
685
686            let req = Requests::RemoveTableKeys(RemoveTableKeysCommand {
687                request_id: get_request_id(),
688                segment: self.name.clone(),
689                delegation_token: self
690                    .delegation_token_provider
691                    .retrieve_token(self.factory.controller_client())
692                    .await,
693                keys: tks,
694                table_segment_offset: offset,
695            });
696            let result = self
697                .factory
698                .create_raw_client_for_endpoint(self.endpoint.clone())
699                .send_request(&req)
700                .await;
701            debug!("Reply for RemoveTableKeys request {:?}", result);
702            match result {
703                Ok(reply) => RetryResult::Success(reply),
704                Err(e) => {
705                    if e.is_token_expired() {
706                        self.delegation_token_provider.signal_token_expiry();
707                        debug!("auth token needs to refresh");
708                    }
709                    debug!("retry on error {:?}", e);
710                    RetryResult::Retry(e)
711                }
712            }
713        })
714        .await
715        .map_err(|e| TableError::ConnectionError {
716            can_retry: true,
717            operation: op.into(),
718            source: e.error,
719        })
720        .and_then(|r| match r {
721            Replies::TableKeysRemoved(..) => Ok(()),
722            Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
723                operation: op.into(),
724                error_msg: c.to_string(),
725            }),
726            Replies::TableKeyDoesNotExist(c) => Err(TableError::KeyDoesNotExist {
727                operation: op.into(),
728                error_msg: c.to_string(),
729            }),
730            _ => Err(TableError::OperationError {
731                operation: op.into(),
732                error_msg: r.to_string(),
733            }),
734        })
735    }
736
737    /// Read the raw keys from the table map. It returns a list of keys and its versions with a continuation token.
738    async fn read_keys_raw(
739        &self,
740        max_keys_at_once: i32,
741        token: &[u8],
742    ) -> Result<(Vec<(Vec<u8>, Version)>, Vec<u8>), TableError> {
743        let op = "Read keys";
744
745        retry_async(self.factory.config().retry_policy, || async {
746            let req = Requests::ReadTableKeys(ReadTableKeysCommand {
747                request_id: get_request_id(),
748                segment: self.name.clone(),
749                delegation_token: self
750                    .delegation_token_provider
751                    .retrieve_token(self.factory.controller_client())
752                    .await,
753                suggested_key_count: max_keys_at_once,
754                continuation_token: token.to_vec(),
755            });
756            let result = self
757                .factory
758                .create_raw_client_for_endpoint(self.endpoint.clone())
759                .send_request(&req)
760                .await;
761            match result {
762                Ok(reply) => RetryResult::Success(reply),
763                Err(e) => {
764                    if e.is_token_expired() {
765                        self.delegation_token_provider.signal_token_expiry();
766                        info!("auth token needs to refresh");
767                    }
768                    RetryResult::Retry(e)
769                }
770            }
771        })
772        .await
773        .map_err(|e| TableError::ConnectionError {
774            can_retry: true,
775            operation: op.into(),
776            source: e.error,
777        })
778        .and_then(|r| match r {
779            Replies::TableKeysRead(c) => {
780                let keys: Vec<(Vec<u8>, Version)> =
781                    c.keys.iter().map(|k| (k.data.clone(), k.key_version)).collect();
782
783                Ok((keys, c.continuation_token))
784            }
785            _ => Err(TableError::OperationError {
786                operation: op.into(),
787                error_msg: r.to_string(),
788            }),
789        })
790    }
791
792    /// Read the raw entries from the table map. It returns a list of key-values and its versions with a continuation token.
793    async fn read_entries_raw(
794        &self,
795        max_entries_at_once: i32,
796        token: &[u8],
797    ) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, Vec<u8>), TableError> {
798        let op = "Read entries";
799
800        retry_async(self.factory.config().retry_policy, || async {
801            let req = Requests::ReadTableEntries(ReadTableEntriesCommand {
802                request_id: get_request_id(),
803                segment: self.name.clone(),
804                delegation_token: self
805                    .delegation_token_provider
806                    .retrieve_token(self.factory.controller_client())
807                    .await,
808                suggested_entry_count: max_entries_at_once,
809                continuation_token: token.to_vec(),
810            });
811            let result = self
812                .factory
813                .create_raw_client_for_endpoint(self.endpoint.clone())
814                .send_request(&req)
815                .await;
816            debug!("Reply for read tableEntries request {:?}", result);
817
818            match result {
819                Ok(reply) => RetryResult::Success(reply),
820                Err(e) => {
821                    if e.is_token_expired() {
822                        self.delegation_token_provider.signal_token_expiry();
823                        info!("auth token needs to refresh");
824                    }
825                    RetryResult::Retry(e)
826                }
827            }
828        })
829        .await
830        .map_err(|e| TableError::ConnectionError {
831            can_retry: true,
832            operation: op.into(),
833            source: e.error,
834        })
835        .and_then(|r| {
836            match r {
837                Replies::TableEntriesRead(c) => {
838                    let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
839                        .entries
840                        .entries
841                        .iter()
842                        .map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
843                        .collect();
844
845                    Ok((entries, c.continuation_token))
846                }
847                // unexpected response from Segment store causes a panic.
848                _ => Err(TableError::OperationError {
849                    operation: op.into(),
850                    error_msg: r.to_string(),
851                }),
852            }
853        })
854    }
855
856    /// Read the raw entries from the table map from a given position.
857    /// It returns a list of key-values and its versions with a latest position.
858    async fn read_entries_raw_delta(
859        &self,
860        max_entries_at_once: i32,
861        from_position: i64,
862    ) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, i64), TableError> {
863        let op = "Read entries delta";
864
865        retry_async(self.factory.config().retry_policy, || async {
866            let req = Requests::ReadTableEntriesDelta(ReadTableEntriesDeltaCommand {
867                request_id: get_request_id(),
868                segment: self.name.clone(),
869                delegation_token: self
870                    .delegation_token_provider
871                    .retrieve_token(self.factory.controller_client())
872                    .await,
873                from_position,
874                suggested_entry_count: max_entries_at_once,
875            });
876            let result = self
877                .factory
878                .create_raw_client_for_endpoint(self.endpoint.clone())
879                .send_request(&req)
880                .await;
881
882            match result {
883                Ok(reply) => RetryResult::Success(reply),
884                Err(e) => {
885                    if e.is_token_expired() {
886                        self.delegation_token_provider.signal_token_expiry();
887                        info!("auth token needs to refresh");
888                    }
889                    RetryResult::Retry(e)
890                }
891            }
892        })
893        .await
894        .map_err(|e| TableError::ConnectionError {
895            can_retry: true,
896            operation: op.into(),
897            source: e.error,
898        })
899        .and_then(|r| {
900            match r {
901                Replies::TableEntriesDeltaRead(c) => {
902                    let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
903                        .entries
904                        .entries
905                        .iter()
906                        .map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
907                        .collect();
908
909                    Ok((entries, c.last_position))
910                }
911                Replies::NoSuchSegment(c) => {
912                    debug!("Received NoSuchSegment, the table segment is deleted {:?}", c);
913                    Err(TableError::TableDoesNotExist {
914                        operation: op.into(),
915                        name: c.segment,
916                    })
917                }
918                // unexpected response from Segment store causes a panic.
919                _ => Err(TableError::OperationError {
920                    operation: op.into(),
921                    error_msg: "Unexpected response received from Segment Store".to_string(),
922                }),
923            }
924        })
925    }
926}
927
928async fn delete_table_segment(
929    factory: &ClientFactoryAsync,
930    segment: &ScopedSegment,
931    delegation_token_provider: &DelegationTokenProvider,
932) -> RetryResult<Replies, RawClientError> {
933    let req = Requests::DeleteTableSegment(DeleteTableSegmentCommand {
934        request_id: get_request_id(),
935        segment: segment.to_string(),
936        must_be_empty: false,
937        delegation_token: delegation_token_provider
938            .retrieve_token(factory.controller_client())
939            .await,
940    });
941
942    let endpoint = factory
943        .controller_client()
944        .get_endpoint_for_segment(segment)
945        .await
946        .expect("get endpoint for segment");
947    debug!("endpoint is {:?}", endpoint);
948
949    let result = factory
950        .create_raw_client_for_endpoint(endpoint.clone())
951        .send_request(&req)
952        .await;
953    match result {
954        Ok(reply) => RetryResult::Success(reply),
955        Err(e) => {
956            if e.is_token_expired() {
957                delegation_token_provider.signal_token_expiry();
958                debug!("auth token needs to refresh");
959            }
960            debug!("retry on error {:?}", e);
961            RetryResult::Retry(e)
962        }
963    }
964}
965
966#[cfg(test)]
967mod test {
968    use super::*;
969    use crate::client_factory::ClientFactory;
970    use pravega_client_config::connection_type::{ConnectionType, MockType};
971    use pravega_client_config::ClientConfigBuilder;
972    use pravega_client_shared::PravegaNodeUri;
973    use tokio::runtime::Runtime;
974
975    #[test]
976    fn test_table_map_unconditional_insert_and_remove() {
977        let mut rt = Runtime::new().unwrap();
978        let table_map = create_table_map(&mut rt);
979
980        // unconditionally insert non-existing key
981        let version = rt
982            .block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
983            .expect("unconditionally insert into table map");
984        assert_eq!(version, 0);
985
986        // unconditionally insert existing key
987        let version = rt
988            .block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
989            .expect("unconditionally insert into table map");
990        assert_eq!(version, 1);
991
992        // unconditionally remove
993        rt.block_on(table_map.remove(&"key".to_string(), -1))
994            .expect("remove key");
995
996        // get the key, should return None
997        let option: Option<(String, Version)> = rt
998            .block_on(table_map.get(&"key".to_string()))
999            .expect("remove key");
1000        assert!(option.is_none());
1001    }
1002
1003    #[test]
1004    fn test_table_map_conditional_insert_and_remove() {
1005        let mut rt = Runtime::new().unwrap();
1006        let table_map = create_table_map(&mut rt);
1007
1008        // conditionally insert non-existing key
1009        let version = rt
1010            .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), -1, -1))
1011            .expect("unconditionally insert into table map");
1012        assert_eq!(version, 0);
1013        // conditionally insert existing key
1014        let version = rt
1015            .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1))
1016            .expect("conditionally insert into table map");
1017        assert_eq!(version, 1);
1018        // conditionally insert key with wrong version
1019        let result =
1020            rt.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1));
1021        assert!(result.is_err());
1022        // conditionally remove key
1023        let result = rt.block_on(table_map.remove_conditionally(&"key".to_string(), 1, -1));
1024        assert!(result.is_ok());
1025
1026        // get the key, should return None
1027        let option: Option<(String, Version)> = rt
1028            .block_on(table_map.get(&"key".to_string()))
1029            .expect("remove key");
1030        assert!(option.is_none());
1031    }
1032
1033    #[test]
1034    fn test_table_map_insert_remove_all() {
1035        let mut rt = Runtime::new().unwrap();
1036        let table_map = create_table_map(&mut rt);
1037
1038        // conditionally insert all
1039        let mut kvs = vec![];
1040        let k1 = "k1".to_string();
1041        let v1 = "v1".to_string();
1042        let k2 = "k2".to_string();
1043        let v2 = "v2".to_string();
1044        kvs.push((&k1, &v1, -1));
1045        kvs.push((&k2, &v2, -1));
1046
1047        let version = rt
1048            .block_on(table_map.insert_conditionally_all(kvs, -1))
1049            .expect("unconditionally insert all into table map");
1050        let expected = vec![0, 0];
1051        assert_eq!(version, expected);
1052
1053        // conditionally remove all
1054        let ks = vec![(&k1, 0), (&k2, 0)];
1055        rt.block_on(table_map.remove_conditionally_all(ks, -1))
1056            .expect("conditionally remove all from table map");
1057
1058        // get the key, should return None
1059        let option: Option<(String, Version)> =
1060            rt.block_on(table_map.get(&"k1".to_string())).expect("remove key");
1061        assert!(option.is_none());
1062        let option: Option<(String, Version)> =
1063            rt.block_on(table_map.get(&"k2".to_string())).expect("remove key");
1064        assert!(option.is_none());
1065    }
1066
1067    // helper function
1068    fn create_table_map(rt: &mut Runtime) -> Table {
1069        let config = ClientConfigBuilder::default()
1070            .connection_type(ConnectionType::Mock(MockType::Happy))
1071            .mock(true)
1072            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091"))
1073            .build()
1074            .unwrap();
1075        let factory = ClientFactory::new(config);
1076        let scope = Scope {
1077            name: "tablemapScope".to_string(),
1078        };
1079        rt.block_on(factory.create_table(scope, "tablemap".to_string()))
1080    }
1081}