1use crate::client_factory::ClientFactoryAsync;
12use crate::segment::raw_client::{RawClient, RawClientError};
13use crate::util::get_request_id;
14
15use pravega_client_auth::DelegationTokenProvider;
16use pravega_client_retry::retry_async::retry_async;
17use pravega_client_retry::retry_result::RetryResult;
18use pravega_client_shared::{PravegaNodeUri, Stream as PravegaStream};
19use pravega_client_shared::{Scope, ScopedSegment, ScopedStream, Segment};
20use pravega_wire_protocol::commands::{
21 CreateTableSegmentCommand, DeleteTableSegmentCommand, ReadTableCommand, ReadTableEntriesCommand,
22 ReadTableEntriesDeltaCommand, ReadTableKeysCommand, RemoveTableKeysCommand, TableEntries, TableKey,
23 TableValue, UpdateTableEntriesCommand,
24};
25use pravega_wire_protocol::wire_commands::{Replies, Requests};
26
27use async_stream::try_stream;
28use futures::stream::Stream;
29use serde::Serialize;
30use serde_cbor::from_slice;
31use serde_cbor::to_vec;
32use snafu::Snafu;
33use tracing::{debug, info};
34
35pub type Version = i64;
36
37const KVTABLE_SUFFIX: &str = "_kvtable";
38
39#[derive(Debug, Snafu)]
40pub enum TableError {
41 #[snafu(display("Connection error while performing {}: {}", operation, source))]
42 ConnectionError {
43 can_retry: bool,
44 operation: String,
45 source: RawClientError,
46 },
47 #[snafu(display("Key does not exist while performing {}: {}", operation, error_msg))]
48 KeyDoesNotExist { operation: String, error_msg: String },
49 #[snafu(display("Table {} does not exist while performing {}", name, operation))]
50 TableDoesNotExist { operation: String, name: String },
51 #[snafu(display(
52 "Incorrect Key version observed while performing {}: {}",
53 operation,
54 error_msg
55 ))]
56 IncorrectKeyVersion { operation: String, error_msg: String },
57 #[snafu(display("Error observed while performing {} due to {}", operation, error_msg,))]
58 OperationError { operation: String, error_msg: String },
59}
60
61pub struct Table {
75 name: String,
78 endpoint: PravegaNodeUri,
79 factory: ClientFactoryAsync,
80 delegation_token_provider: DelegationTokenProvider,
81}
82
83impl Table {
84 pub(crate) async fn delete(
88 scope: Scope,
89 name: String,
90 factory: ClientFactoryAsync,
91 ) -> Result<(), TableError> {
92 let segment = ScopedSegment {
93 scope,
94 stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
95 segment: Segment::from(0),
96 };
97 info!("deleting table map on {:?}", segment);
98
99 let delegation_token_provider = factory
100 .create_delegation_token_provider(ScopedStream::from(&segment))
101 .await;
102 let op = "Delete table segment";
103 retry_async(factory.config().retry_policy, || {
104 delete_table_segment(&factory, &segment, &delegation_token_provider)
105 })
106 .await
107 .map_err(|e| TableError::ConnectionError {
108 can_retry: true,
109 operation: op.to_string(),
110 source: e.error,
111 })
112 .and_then(|r| match r {
113 Replies::SegmentDeleted(..) | Replies::NoSuchSegment(..) => {
114 info!("Table segment {:?} deleted", segment);
115 Ok(())
116 }
117 _ => Err(TableError::OperationError {
118 operation: op.to_string(),
119 error_msg: r.to_string(),
120 }),
121 })
122 }
123
124 pub(crate) async fn new(
125 scope: Scope,
126 name: String,
127 factory: ClientFactoryAsync,
128 ) -> Result<Table, TableError> {
129 let segment = ScopedSegment {
130 scope,
131 stream: PravegaStream::from(format!("{}{}", name, KVTABLE_SUFFIX)),
132 segment: Segment::from(0),
133 };
134 info!("creating table map on {:?}", segment);
135
136 let delegation_token_provider = factory
137 .create_delegation_token_provider(ScopedStream::from(&segment))
138 .await;
139
140 let op = "Create table segment";
141 retry_async(factory.config().retry_policy, || async {
142 let req = Requests::CreateTableSegment(CreateTableSegmentCommand {
143 request_id: get_request_id(),
144 segment: segment.to_string(),
145 delegation_token: delegation_token_provider
146 .retrieve_token(factory.controller_client())
147 .await,
148 });
149
150 let endpoint = factory
151 .controller_client()
152 .get_endpoint_for_segment(&segment)
153 .await
154 .expect("get endpoint for segment");
155 debug!("endpoint is {:?}", endpoint);
156
157 let result = factory
158 .create_raw_client_for_endpoint(endpoint.clone())
159 .send_request(&req)
160 .await;
161 match result {
162 Ok(reply) => RetryResult::Success((reply, endpoint)),
163 Err(e) => {
164 if e.is_token_expired() {
165 delegation_token_provider.signal_token_expiry();
166 debug!("auth token needs to refresh");
167 }
168 debug!("retry on error {:?}", e);
169 RetryResult::Retry(e)
170 }
171 }
172 })
173 .await
174 .map_err(|e| TableError::ConnectionError {
175 can_retry: true,
176 operation: op.to_string(),
177 source: e.error,
178 })
179 .and_then(|(r, endpoint)| match r {
180 Replies::SegmentCreated(..) | Replies::SegmentAlreadyExists(..) => {
181 info!("Table segment {:?} created", segment);
182 let table_map = Table {
183 name: segment.to_string(),
184 endpoint,
185 factory,
186 delegation_token_provider,
187 };
188 Ok(table_map)
189 }
190 _ => Err(TableError::OperationError {
191 operation: op.to_string(),
192 error_msg: r.to_string(),
193 }),
194 })
195 }
196
197 pub async fn get<K, V>(&self, k: &K) -> Result<Option<(V, Version)>, TableError>
202 where
203 K: Serialize + serde::de::DeserializeOwned,
204 V: Serialize + serde::de::DeserializeOwned,
205 {
206 let key = to_vec(k).expect("error during serialization.");
207 let read_result = self.get_raw_values(vec![key]).await;
208 read_result.map(|v| {
209 let (l, version) = &v[0];
210 if l.is_empty() {
211 None
212 } else {
213 let value: V = from_slice(l.as_slice()).expect("error during deserialization");
214 Some((value, *version))
215 }
216 })
217 }
218
219 pub async fn insert<K, V>(&self, k: &K, v: &V, offset: i64) -> Result<Version, TableError>
222 where
223 K: Serialize + serde::de::DeserializeOwned,
224 V: Serialize + serde::de::DeserializeOwned,
225 {
226 self.insert_conditionally(k, v, TableKey::KEY_NO_VERSION, offset)
228 .await
229 }
230
231 pub async fn insert_conditionally<K, V>(
238 &self,
239 k: &K,
240 v: &V,
241 key_version: Version,
242 offset: i64,
243 ) -> Result<Version, TableError>
244 where
245 K: Serialize + serde::de::DeserializeOwned,
246 V: Serialize + serde::de::DeserializeOwned,
247 {
248 let key = to_vec(k).expect("error during serialization.");
249 let val = to_vec(v).expect("error during serialization.");
250 self.insert_raw_values(vec![(key, val, key_version)], offset)
251 .await
252 .map(|versions| versions[0])
253 }
254
255 pub async fn remove<K: Serialize + serde::de::DeserializeOwned>(
257 &self,
258 k: &K,
259 offset: i64,
260 ) -> Result<(), TableError> {
261 self.remove_conditionally(k, TableKey::KEY_NO_VERSION, offset)
262 .await
263 }
264
265 pub async fn remove_conditionally<K>(
268 &self,
269 k: &K,
270 key_version: Version,
271 offset: i64,
272 ) -> Result<(), TableError>
273 where
274 K: Serialize + serde::de::DeserializeOwned,
275 {
276 let key = to_vec(k).expect("error during serialization.");
277 self.remove_raw_values(vec![(key, key_version)], offset).await
278 }
279
280 pub async fn get_all<K, V>(&self, keys: Vec<&K>) -> Result<Vec<Option<(V, Version)>>, TableError>
284 where
285 K: Serialize + serde::de::DeserializeOwned,
286 V: Serialize + serde::de::DeserializeOwned,
287 {
288 let keys_raw: Vec<Vec<u8>> = keys
289 .iter()
290 .map(|k| to_vec(*k).expect("error during serialization."))
291 .collect();
292
293 let read_result: Result<Vec<(Vec<u8>, Version)>, TableError> = self.get_raw_values(keys_raw).await;
294 read_result.map(|v| {
295 v.iter()
296 .map(|(data, version)| {
297 if data.is_empty() {
298 None
299 } else {
300 let value: V = from_slice(data.as_slice()).expect("error during deserialization");
301 Some((value, *version))
302 }
303 })
304 .collect()
305 })
306 }
307
308 pub async fn insert_all<K, V>(&self, kvps: Vec<(&K, &V)>, offset: i64) -> Result<Vec<Version>, TableError>
311 where
312 K: Serialize + serde::de::DeserializeOwned,
313 V: Serialize + serde::de::DeserializeOwned,
314 {
315 let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
316 .iter()
317 .map(|(k, v)| {
318 (
319 to_vec(k).expect("error during serialization."),
320 to_vec(v).expect("error during serialization."),
321 TableKey::KEY_NO_VERSION,
322 )
323 })
324 .collect();
325 self.insert_raw_values(r, offset).await
326 }
327
328 pub async fn insert_conditionally_all<K, V>(
336 &self,
337 kvps: Vec<(&K, &V, Version)>,
338 offset: i64,
339 ) -> Result<Vec<Version>, TableError>
340 where
341 K: Serialize + serde::de::DeserializeOwned,
342 V: Serialize + serde::de::DeserializeOwned,
343 {
344 let r: Vec<(Vec<u8>, Vec<u8>, Version)> = kvps
345 .iter()
346 .map(|(k, v, ver)| {
347 (
348 to_vec(k).expect("error during serialization."),
349 to_vec(v).expect("error during serialization."),
350 *ver,
351 )
352 })
353 .collect();
354 self.insert_raw_values(r, offset).await
355 }
356
357 pub async fn remove_all<K>(&self, keys: Vec<&K>, offset: i64) -> Result<(), TableError>
359 where
360 K: Serialize + serde::de::DeserializeOwned,
361 {
362 let r: Vec<(&K, Version)> = keys.iter().map(|k| (*k, TableKey::KEY_NO_VERSION)).collect();
363 self.remove_conditionally_all(r, offset).await
364 }
365
366 pub async fn remove_conditionally_all<K>(
369 &self,
370 keys: Vec<(&K, Version)>,
371 offset: i64,
372 ) -> Result<(), TableError>
373 where
374 K: Serialize + serde::de::DeserializeOwned,
375 {
376 let r: Vec<(Vec<u8>, Version)> = keys
377 .iter()
378 .map(|(k, v)| (to_vec(k).expect("error during serialization."), *v))
379 .collect();
380 self.remove_raw_values(r, offset).await
381 }
382
383 pub fn read_keys_stream<'stream, 'map: 'stream, K: 'stream>(
385 &'map self,
386 max_keys_at_once: i32,
387 ) -> impl Stream<Item = Result<(K, Version), TableError>> + 'stream
388 where
389 K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
390 {
391 try_stream! {
392 let mut token: Vec<u8> = Vec::new();
393 loop {
394 let res: (Vec<(Vec<u8>, Version)>, Vec<u8>) = self.read_keys_raw(max_keys_at_once, &token).await?;
395 let (keys, t) = res;
396 if keys.is_empty() {
397 break;
398 } else {
399 for (key_raw, version) in keys {
400 let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
401 yield (key, version)
402 }
403 token = t;
404 }
405 }
406 }
407 }
408
409 pub fn read_entries_stream<'stream, 'map: 'stream, K: 'map, V: 'map>(
412 &'map self,
413 max_entries_at_once: i32,
414 ) -> impl Stream<Item = Result<(K, V, Version), TableError>> + 'stream
415 where
416 K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
417 V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
418 {
419 try_stream! {
420 let mut token: Vec<u8> = Vec::new();
421 loop {
422 let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, Vec<u8>) = self.read_entries_raw(max_entries_at_once, &token).await?;
423 let (entries, t) = res;
424 if entries.is_empty() {
425 break;
426 } else {
427 for (key_raw, value_raw, version) in entries {
428 let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
429 let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
430 yield (key, value, version)
431 }
432 token = t;
433 }
434 }
435 }
436 }
437
438 pub fn read_entries_stream_from_position<'stream, 'map: 'stream, K: 'map, V: 'map>(
441 &'map self,
442 max_entries_at_once: i32,
443 mut from_position: i64,
444 ) -> impl Stream<Item = Result<(K, V, Version, i64), TableError>> + 'stream
445 where
446 K: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
447 V: Serialize + serde::de::DeserializeOwned + std::marker::Unpin,
448 {
449 try_stream! {
450 loop {
451 let res: (Vec<(Vec<u8>, Vec<u8>,Version)>, i64) = self.read_entries_raw_delta(max_entries_at_once, from_position).await?;
452 let (entries, last_position) = res;
453 if entries.is_empty() {
454 break;
455 } else {
456 for (key_raw, value_raw, version) in entries {
457 let key: K = from_slice(key_raw.as_slice()).expect("error during deserialization");
458 let value: V = from_slice(value_raw.as_slice()).expect("error during deserialization");
459 yield (key, value, version, last_position)
460 }
461 from_position = last_position;
462 }
463 }
464 }
465 }
466
467 async fn get_keys<K>(
472 &self,
473 max_keys_at_once: i32,
474 token: &[u8],
475 ) -> Result<(Vec<(K, Version)>, Vec<u8>), TableError>
476 where
477 K: Serialize + serde::de::DeserializeOwned,
478 {
479 let res = self.read_keys_raw(max_keys_at_once, token).await;
480 res.map(|(keys, token)| {
481 let keys_de: Vec<(K, Version)> = keys
482 .iter()
483 .map(|(k, version)| {
484 let key: K = from_slice(k.as_slice()).expect("error during deserialization");
485 (key, *version)
486 })
487 .collect();
488 (keys_de, token)
489 })
490 }
491
492 async fn get_entries<K, V>(
497 &self,
498 max_entries_at_once: i32,
499 token: &[u8],
500 ) -> Result<(Vec<(K, V, Version)>, Vec<u8>), TableError>
501 where
502 K: Serialize + serde::de::DeserializeOwned,
503 V: Serialize + serde::de::DeserializeOwned,
504 {
505 let res = self.read_entries_raw(max_entries_at_once, token).await;
506 res.map(|(entries, token)| {
507 let entries_de: Vec<(K, V, Version)> = entries
508 .iter()
509 .map(|(k, v, version)| {
510 let key: K = from_slice(k.as_slice()).expect("error during deserialization");
511 let value: V = from_slice(v.as_slice()).expect("error during deserialization");
512 (key, value, *version)
513 })
514 .collect();
515 (entries_de, token)
516 })
517 }
518
519 async fn get_entries_delta<K, V>(
521 &self,
522 max_entries_at_once: i32,
523 from_position: i64,
524 ) -> Result<(Vec<(K, V, Version)>, i64), TableError>
525 where
526 K: Serialize + serde::de::DeserializeOwned,
527 V: Serialize + serde::de::DeserializeOwned,
528 {
529 let res = self
530 .read_entries_raw_delta(max_entries_at_once, from_position)
531 .await;
532 res.map(|(entries, token)| {
533 let entries_de: Vec<(K, V, Version)> = entries
534 .iter()
535 .map(|(k, v, version)| {
536 let key: K = from_slice(k.as_slice()).expect("error during deserialization");
537 let value: V = from_slice(v.as_slice()).expect("error during deserialization");
538 (key, value, *version)
539 })
540 .collect();
541 (entries_de, token)
542 })
543 }
544
545 async fn insert_raw_values(
548 &self,
549 kvps: Vec<(Vec<u8>, Vec<u8>, Version)>,
550 offset: i64,
551 ) -> Result<Vec<Version>, TableError> {
552 let op = "Insert into tablemap";
553
554 retry_async(self.factory.config().retry_policy, || async {
555 let entries: Vec<(TableKey, TableValue)> = kvps
556 .iter()
557 .map(|(k, v, ver)| {
558 let tk = TableKey::new(k.clone(), *ver);
559 let tv = TableValue::new(v.clone());
560 (tk, tv)
561 })
562 .collect();
563 let te = TableEntries { entries };
564
565 let req = Requests::UpdateTableEntries(UpdateTableEntriesCommand {
566 request_id: get_request_id(),
567 segment: self.name.clone(),
568 delegation_token: self
569 .delegation_token_provider
570 .retrieve_token(self.factory.controller_client())
571 .await,
572 table_entries: te,
573 table_segment_offset: offset,
574 });
575 let result = self
576 .factory
577 .create_raw_client_for_endpoint(self.endpoint.clone())
578 .send_request(&req)
579 .await;
580 match result {
581 Ok(reply) => RetryResult::Success(reply),
582 Err(e) => {
583 if e.is_token_expired() {
584 self.delegation_token_provider.signal_token_expiry();
585 info!("auth token needs to refresh");
586 }
587 info!("Table insert retry error {:?}", e);
588 RetryResult::Retry(e)
589 }
590 }
591 })
592 .await
593 .map_err(|e| TableError::ConnectionError {
594 can_retry: true,
595 operation: op.into(),
596 source: e.error,
597 })
598 .and_then(|r| match r {
599 Replies::TableEntriesUpdated(c) => Ok(c.updated_versions),
600 Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
601 operation: op.into(),
602 error_msg: c.to_string(),
603 }),
604 _ => Err(TableError::OperationError {
606 operation: op.into(),
607 error_msg: r.to_string(),
608 }),
609 })
610 }
611
612 async fn get_raw_values(&self, keys: Vec<Vec<u8>>) -> Result<Vec<(Vec<u8>, Version)>, TableError> {
615 let op = "Read from tablemap";
616
617 retry_async(self.factory.config().retry_policy, || async {
618 let table_keys: Vec<TableKey> = keys
619 .iter()
620 .map(|k| TableKey::new(k.clone(), TableKey::KEY_NO_VERSION))
621 .collect();
622
623 let req = Requests::ReadTable(ReadTableCommand {
624 request_id: get_request_id(),
625 segment: self.name.clone(),
626 delegation_token: self
627 .delegation_token_provider
628 .retrieve_token(self.factory.controller_client())
629 .await,
630 keys: table_keys,
631 });
632 let result = self
633 .factory
634 .create_raw_client_for_endpoint(self.endpoint.clone())
635 .send_request(&req)
636 .await;
637 debug!("Read Response {:?}", result);
638 match result {
639 Ok(reply) => RetryResult::Success(reply),
640 Err(e) => {
641 if e.is_token_expired() {
642 self.delegation_token_provider.signal_token_expiry();
643 info!("auth token needs to refresh");
644 }
645 RetryResult::Retry(e)
646 }
647 }
648 })
649 .await
650 .map_err(|e| TableError::ConnectionError {
651 can_retry: true,
652 operation: op.into(),
653 source: e.error,
654 })
655 .and_then(|reply| match reply {
656 Replies::TableRead(c) => {
657 let v: Vec<(TableKey, TableValue)> = c.entries.entries;
658 if v.is_empty() {
659 panic!("Invalid response from the Segment store");
661 } else {
662 let result: Vec<(Vec<u8>, Version)> =
664 v.iter().map(|(l, r)| (r.data.clone(), l.key_version)).collect();
665 Ok(result)
666 }
667 }
668 _ => Err(TableError::OperationError {
669 operation: op.into(),
670 error_msg: reply.to_string(),
671 }),
672 })
673 }
674
675 async fn remove_raw_values(&self, keys: Vec<(Vec<u8>, Version)>, offset: i64) -> Result<(), TableError> {
678 let op = "Remove keys from table";
679
680 retry_async(self.factory.config().retry_policy, || async {
681 let tks: Vec<TableKey> = keys
682 .iter()
683 .map(|(k, ver)| TableKey::new(k.clone(), *ver))
684 .collect();
685
686 let req = Requests::RemoveTableKeys(RemoveTableKeysCommand {
687 request_id: get_request_id(),
688 segment: self.name.clone(),
689 delegation_token: self
690 .delegation_token_provider
691 .retrieve_token(self.factory.controller_client())
692 .await,
693 keys: tks,
694 table_segment_offset: offset,
695 });
696 let result = self
697 .factory
698 .create_raw_client_for_endpoint(self.endpoint.clone())
699 .send_request(&req)
700 .await;
701 debug!("Reply for RemoveTableKeys request {:?}", result);
702 match result {
703 Ok(reply) => RetryResult::Success(reply),
704 Err(e) => {
705 if e.is_token_expired() {
706 self.delegation_token_provider.signal_token_expiry();
707 debug!("auth token needs to refresh");
708 }
709 debug!("retry on error {:?}", e);
710 RetryResult::Retry(e)
711 }
712 }
713 })
714 .await
715 .map_err(|e| TableError::ConnectionError {
716 can_retry: true,
717 operation: op.into(),
718 source: e.error,
719 })
720 .and_then(|r| match r {
721 Replies::TableKeysRemoved(..) => Ok(()),
722 Replies::TableKeyBadVersion(c) => Err(TableError::IncorrectKeyVersion {
723 operation: op.into(),
724 error_msg: c.to_string(),
725 }),
726 Replies::TableKeyDoesNotExist(c) => Err(TableError::KeyDoesNotExist {
727 operation: op.into(),
728 error_msg: c.to_string(),
729 }),
730 _ => Err(TableError::OperationError {
731 operation: op.into(),
732 error_msg: r.to_string(),
733 }),
734 })
735 }
736
737 async fn read_keys_raw(
739 &self,
740 max_keys_at_once: i32,
741 token: &[u8],
742 ) -> Result<(Vec<(Vec<u8>, Version)>, Vec<u8>), TableError> {
743 let op = "Read keys";
744
745 retry_async(self.factory.config().retry_policy, || async {
746 let req = Requests::ReadTableKeys(ReadTableKeysCommand {
747 request_id: get_request_id(),
748 segment: self.name.clone(),
749 delegation_token: self
750 .delegation_token_provider
751 .retrieve_token(self.factory.controller_client())
752 .await,
753 suggested_key_count: max_keys_at_once,
754 continuation_token: token.to_vec(),
755 });
756 let result = self
757 .factory
758 .create_raw_client_for_endpoint(self.endpoint.clone())
759 .send_request(&req)
760 .await;
761 match result {
762 Ok(reply) => RetryResult::Success(reply),
763 Err(e) => {
764 if e.is_token_expired() {
765 self.delegation_token_provider.signal_token_expiry();
766 info!("auth token needs to refresh");
767 }
768 RetryResult::Retry(e)
769 }
770 }
771 })
772 .await
773 .map_err(|e| TableError::ConnectionError {
774 can_retry: true,
775 operation: op.into(),
776 source: e.error,
777 })
778 .and_then(|r| match r {
779 Replies::TableKeysRead(c) => {
780 let keys: Vec<(Vec<u8>, Version)> =
781 c.keys.iter().map(|k| (k.data.clone(), k.key_version)).collect();
782
783 Ok((keys, c.continuation_token))
784 }
785 _ => Err(TableError::OperationError {
786 operation: op.into(),
787 error_msg: r.to_string(),
788 }),
789 })
790 }
791
792 async fn read_entries_raw(
794 &self,
795 max_entries_at_once: i32,
796 token: &[u8],
797 ) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, Vec<u8>), TableError> {
798 let op = "Read entries";
799
800 retry_async(self.factory.config().retry_policy, || async {
801 let req = Requests::ReadTableEntries(ReadTableEntriesCommand {
802 request_id: get_request_id(),
803 segment: self.name.clone(),
804 delegation_token: self
805 .delegation_token_provider
806 .retrieve_token(self.factory.controller_client())
807 .await,
808 suggested_entry_count: max_entries_at_once,
809 continuation_token: token.to_vec(),
810 });
811 let result = self
812 .factory
813 .create_raw_client_for_endpoint(self.endpoint.clone())
814 .send_request(&req)
815 .await;
816 debug!("Reply for read tableEntries request {:?}", result);
817
818 match result {
819 Ok(reply) => RetryResult::Success(reply),
820 Err(e) => {
821 if e.is_token_expired() {
822 self.delegation_token_provider.signal_token_expiry();
823 info!("auth token needs to refresh");
824 }
825 RetryResult::Retry(e)
826 }
827 }
828 })
829 .await
830 .map_err(|e| TableError::ConnectionError {
831 can_retry: true,
832 operation: op.into(),
833 source: e.error,
834 })
835 .and_then(|r| {
836 match r {
837 Replies::TableEntriesRead(c) => {
838 let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
839 .entries
840 .entries
841 .iter()
842 .map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
843 .collect();
844
845 Ok((entries, c.continuation_token))
846 }
847 _ => Err(TableError::OperationError {
849 operation: op.into(),
850 error_msg: r.to_string(),
851 }),
852 }
853 })
854 }
855
856 async fn read_entries_raw_delta(
859 &self,
860 max_entries_at_once: i32,
861 from_position: i64,
862 ) -> Result<(Vec<(Vec<u8>, Vec<u8>, Version)>, i64), TableError> {
863 let op = "Read entries delta";
864
865 retry_async(self.factory.config().retry_policy, || async {
866 let req = Requests::ReadTableEntriesDelta(ReadTableEntriesDeltaCommand {
867 request_id: get_request_id(),
868 segment: self.name.clone(),
869 delegation_token: self
870 .delegation_token_provider
871 .retrieve_token(self.factory.controller_client())
872 .await,
873 from_position,
874 suggested_entry_count: max_entries_at_once,
875 });
876 let result = self
877 .factory
878 .create_raw_client_for_endpoint(self.endpoint.clone())
879 .send_request(&req)
880 .await;
881
882 match result {
883 Ok(reply) => RetryResult::Success(reply),
884 Err(e) => {
885 if e.is_token_expired() {
886 self.delegation_token_provider.signal_token_expiry();
887 info!("auth token needs to refresh");
888 }
889 RetryResult::Retry(e)
890 }
891 }
892 })
893 .await
894 .map_err(|e| TableError::ConnectionError {
895 can_retry: true,
896 operation: op.into(),
897 source: e.error,
898 })
899 .and_then(|r| {
900 match r {
901 Replies::TableEntriesDeltaRead(c) => {
902 let entries: Vec<(Vec<u8>, Vec<u8>, Version)> = c
903 .entries
904 .entries
905 .iter()
906 .map(|(k, v)| (k.data.clone(), v.data.clone(), k.key_version))
907 .collect();
908
909 Ok((entries, c.last_position))
910 }
911 Replies::NoSuchSegment(c) => {
912 debug!("Received NoSuchSegment, the table segment is deleted {:?}", c);
913 Err(TableError::TableDoesNotExist {
914 operation: op.into(),
915 name: c.segment,
916 })
917 }
918 _ => Err(TableError::OperationError {
920 operation: op.into(),
921 error_msg: "Unexpected response received from Segment Store".to_string(),
922 }),
923 }
924 })
925 }
926}
927
928async fn delete_table_segment(
929 factory: &ClientFactoryAsync,
930 segment: &ScopedSegment,
931 delegation_token_provider: &DelegationTokenProvider,
932) -> RetryResult<Replies, RawClientError> {
933 let req = Requests::DeleteTableSegment(DeleteTableSegmentCommand {
934 request_id: get_request_id(),
935 segment: segment.to_string(),
936 must_be_empty: false,
937 delegation_token: delegation_token_provider
938 .retrieve_token(factory.controller_client())
939 .await,
940 });
941
942 let endpoint = factory
943 .controller_client()
944 .get_endpoint_for_segment(segment)
945 .await
946 .expect("get endpoint for segment");
947 debug!("endpoint is {:?}", endpoint);
948
949 let result = factory
950 .create_raw_client_for_endpoint(endpoint.clone())
951 .send_request(&req)
952 .await;
953 match result {
954 Ok(reply) => RetryResult::Success(reply),
955 Err(e) => {
956 if e.is_token_expired() {
957 delegation_token_provider.signal_token_expiry();
958 debug!("auth token needs to refresh");
959 }
960 debug!("retry on error {:?}", e);
961 RetryResult::Retry(e)
962 }
963 }
964}
965
966#[cfg(test)]
967mod test {
968 use super::*;
969 use crate::client_factory::ClientFactory;
970 use pravega_client_config::connection_type::{ConnectionType, MockType};
971 use pravega_client_config::ClientConfigBuilder;
972 use pravega_client_shared::PravegaNodeUri;
973 use tokio::runtime::Runtime;
974
975 #[test]
976 fn test_table_map_unconditional_insert_and_remove() {
977 let mut rt = Runtime::new().unwrap();
978 let table_map = create_table_map(&mut rt);
979
980 let version = rt
982 .block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
983 .expect("unconditionally insert into table map");
984 assert_eq!(version, 0);
985
986 let version = rt
988 .block_on(table_map.insert(&"key".to_string(), &"value".to_string(), -1))
989 .expect("unconditionally insert into table map");
990 assert_eq!(version, 1);
991
992 rt.block_on(table_map.remove(&"key".to_string(), -1))
994 .expect("remove key");
995
996 let option: Option<(String, Version)> = rt
998 .block_on(table_map.get(&"key".to_string()))
999 .expect("remove key");
1000 assert!(option.is_none());
1001 }
1002
1003 #[test]
1004 fn test_table_map_conditional_insert_and_remove() {
1005 let mut rt = Runtime::new().unwrap();
1006 let table_map = create_table_map(&mut rt);
1007
1008 let version = rt
1010 .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), -1, -1))
1011 .expect("unconditionally insert into table map");
1012 assert_eq!(version, 0);
1013 let version = rt
1015 .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1))
1016 .expect("conditionally insert into table map");
1017 assert_eq!(version, 1);
1018 let result =
1020 rt.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1));
1021 assert!(result.is_err());
1022 let result = rt.block_on(table_map.remove_conditionally(&"key".to_string(), 1, -1));
1024 assert!(result.is_ok());
1025
1026 let option: Option<(String, Version)> = rt
1028 .block_on(table_map.get(&"key".to_string()))
1029 .expect("remove key");
1030 assert!(option.is_none());
1031 }
1032
1033 #[test]
1034 fn test_table_map_insert_remove_all() {
1035 let mut rt = Runtime::new().unwrap();
1036 let table_map = create_table_map(&mut rt);
1037
1038 let mut kvs = vec![];
1040 let k1 = "k1".to_string();
1041 let v1 = "v1".to_string();
1042 let k2 = "k2".to_string();
1043 let v2 = "v2".to_string();
1044 kvs.push((&k1, &v1, -1));
1045 kvs.push((&k2, &v2, -1));
1046
1047 let version = rt
1048 .block_on(table_map.insert_conditionally_all(kvs, -1))
1049 .expect("unconditionally insert all into table map");
1050 let expected = vec![0, 0];
1051 assert_eq!(version, expected);
1052
1053 let ks = vec![(&k1, 0), (&k2, 0)];
1055 rt.block_on(table_map.remove_conditionally_all(ks, -1))
1056 .expect("conditionally remove all from table map");
1057
1058 let option: Option<(String, Version)> =
1060 rt.block_on(table_map.get(&"k1".to_string())).expect("remove key");
1061 assert!(option.is_none());
1062 let option: Option<(String, Version)> =
1063 rt.block_on(table_map.get(&"k2".to_string())).expect("remove key");
1064 assert!(option.is_none());
1065 }
1066
1067 fn create_table_map(rt: &mut Runtime) -> Table {
1069 let config = ClientConfigBuilder::default()
1070 .connection_type(ConnectionType::Mock(MockType::Happy))
1071 .mock(true)
1072 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091"))
1073 .build()
1074 .unwrap();
1075 let factory = ClientFactory::new(config);
1076 let scope = Scope {
1077 name: "tablemapScope".to_string(),
1078 };
1079 rt.block_on(factory.create_table(scope, "tablemap".to_string()))
1080 }
1081}