1#[macro_use]
4extern crate failure;
5
6#[macro_use]
7extern crate fehler;
8
9#[macro_use]
10extern crate lazy_static;
11
12pub use futures_retry::{ErrorHandler, RetryPolicy};
13
14use bytes::Bytes;
15use futures::future;
16use futures::{Stream, TryStreamExt};
17use futures_retry::StreamRetryExt;
18use jsonwebtoken::EncodingKey;
19use reqwest::header::{self, HeaderMap, HeaderValue};
20use reqwest::{Body, Client, ClientBuilder, StatusCode, Url};
21use reqwest_eventsource::{Error as EventsourceError, RequestBuilderExt};
22use serde::{de::DeserializeOwned, ser::Serializer, Serialize};
23use std::fmt;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25
26cfg_if::cfg_if! {
27 if #[cfg(feature = "flexbuffers")] {
28 use flexbuffers as serde_mod;
29 } else {
30 use serde_json as serde_mod;
31 }
32}
33
34#[cfg(feature = "rustls-tls")]
35pub use reqwest::Certificate;
36
37lazy_static! {
38 static ref URL_SET: percent_encoding::AsciiSet = percent_encoding::CONTROLS
39 .add(b'/')
40 .add(b'#')
41 .add(b' ')
42 .add(b'?')
43 .add(b'%');
44}
45
46#[derive(Fail, Debug)]
48pub enum Error {
49 #[fail(display = "invalid url")]
50 InvalidUrl,
51 #[fail(display = "invalid client")]
52 InvalidClient(reqwest::Error),
53 #[fail(display = "invalid response")]
54 InvalidResponse,
55 #[fail(display = "invalid request: {}", _0)]
56 InvalidRequest(reqwest::Error),
57 #[fail(display = "unauthorized")]
58 Unauthorized,
59 #[fail(display = "conflict")]
60 Conflict,
61 #[fail(display = "not found")]
62 NotFound,
63 #[fail(display = "non numeric value")]
64 NonNumericValue,
65 #[fail(display = "bad request")]
66 BadRequest,
67 #[fail(display = "serialize error")]
68 SerializeError,
69 #[fail(display = "deserialize error")]
70 DeserializeError,
71 #[fail(display = "invalid JWT key")]
72 InvalidJWTKey,
73}
74
75#[derive(Fail, Debug)]
77pub enum NotificationError<E>
78where
79 E: fmt::Display + fmt::Debug + Send + Sync + 'static,
80{
81 #[fail(display = "transport error after {} attempts: {}", _1, _0)]
82 Other(E, usize),
83 #[fail(display = "deserialize error")]
84 DeserializeError(Bytes),
85}
86
87#[repr(u16)]
89#[derive(Debug, Clone, Copy, Eq, PartialEq)]
90pub enum PutStatus {
91 Ok,
92 Created,
93}
94
95#[derive(Debug, Clone, Copy)]
96enum Operation {
97 Lock,
98 Unlock,
99 Increment,
100 Decrement,
101 TTL,
102}
103
104#[derive(Debug, Clone, Default, Serialize)]
105struct Claims {
106 sub: String,
107 iss: String,
108 iat: i64,
109 exp: i64,
110}
111
112#[derive(Debug, Clone, Serialize)]
113struct PatchValue {
114 operation: Operation,
115 value: Option<String>,
116}
117
118struct NotificationsErrorHandler<F>
119where
120 F: ErrorHandler<reqwest::Error>,
121{
122 inner: F,
123}
124
125#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
127pub struct Notification<T> {
128 pub key: String,
129 pub data: T,
130}
131
132#[derive(Clone, Debug)]
134pub struct LucidClient {
135 client: Client,
136 url: Url,
137 jwt_key: Option<EncodingKey>,
138}
139
140#[derive(Debug)]
142pub struct Builder<'a> {
143 client: ClientBuilder,
144 url: &'a str,
145 jwt_key: Option<EncodingKey>,
146}
147
148impl<'a> Builder<'a> {
149 pub fn new<U: AsRef<str> + ?Sized>(base_url: &'a U) -> Self {
151 Self {
152 client: ClientBuilder::new(),
153 url: base_url.as_ref(),
154 jwt_key: None,
155 }
156 }
157
158 pub fn add_jwt_key<T: AsRef<[u8]> + ?Sized>(mut self, key: &T) -> Self {
160 self.jwt_key = Some(EncodingKey::from_secret(key.as_ref()));
161 self
162 }
163
164 #[cfg(feature = "rustls-tls")]
165 pub fn add_root_certificate(mut self, cert: Certificate) -> Self {
167 self.client = self.client.add_root_certificate(cert);
168 self
169 }
170
171 #[throws]
173 pub fn build(self) -> LucidClient {
174 LucidClient {
175 client: self.client.build().map_err(Error::InvalidClient)?,
176 url: Url::parse(self.url).map_err(|_| Error::InvalidUrl)?,
177 jwt_key: self.jwt_key,
178 }
179 }
180}
181
182impl LucidClient {
183 #[throws]
185 pub fn new<U: AsRef<str> + ?Sized>(base_url: &U) -> Self {
186 Builder::new(base_url).build()?
187 }
188
189 pub fn builder<'a, U: AsRef<str> + ?Sized>(base_url: &'a U) -> Builder<'a> {
191 Builder::new(base_url)
192 }
193
194 #[throws]
196 pub async fn health_check(&self) {
197 let url = self.url.join("health").map_err(|_| Error::InvalidUrl)?;
198 let res = self
199 .client
200 .head(url)
201 .send()
202 .await
203 .map_err(Error::InvalidRequest)?;
204 match res.status() {
205 StatusCode::OK => (),
206 _ => throw!(Error::InvalidResponse),
207 }
208 }
209
210 #[throws]
212 pub async fn put_raw<K: AsRef<str> + ?Sized, V: Into<Body>>(
213 &self,
214 key: &K,
215 value: V,
216 ) -> PutStatus {
217 let res = self
218 .client
219 .put(self.key_url(key)?)
220 .headers(self.authorization()?)
221 .body(value)
222 .send()
223 .await
224 .map_err(Error::InvalidRequest)?;
225 match res.status() {
226 StatusCode::OK => PutStatus::Ok,
227 StatusCode::CREATED => PutStatus::Created,
228 StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
229 StatusCode::CONFLICT => throw!(Error::Conflict),
230 _ => throw!(Error::InvalidResponse),
231 }
232 }
233
234 #[throws]
236 pub async fn get_raw<K: AsRef<str> + ?Sized>(&self, key: &K) -> Option<Bytes> {
237 let res = self
238 .client
239 .get(self.key_url(key)?)
240 .headers(self.authorization()?)
241 .send()
242 .await
243 .map_err(Error::InvalidRequest)?;
244 match res.status() {
245 StatusCode::OK => Some(res.bytes().await.map_err(|_| Error::InvalidResponse)?),
246 StatusCode::NOT_FOUND => None,
247 _ => throw!(Error::InvalidResponse),
248 }
249 }
250
251 #[throws]
253 pub async fn delete<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
254 let res = self
255 .client
256 .delete(self.key_url(key)?)
257 .headers(self.authorization()?)
258 .send()
259 .await
260 .map_err(Error::InvalidRequest)?;
261 match res.status() {
262 StatusCode::OK | StatusCode::NO_CONTENT => true,
263 StatusCode::NOT_FOUND => false,
264 StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
265 _ => throw!(Error::InvalidResponse),
266 }
267 }
268
269 #[throws]
271 pub async fn exists<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
272 let res = self
273 .client
274 .head(self.key_url(key)?)
275 .headers(self.authorization()?)
276 .send()
277 .await
278 .map_err(Error::InvalidRequest)?;
279 match res.status() {
280 StatusCode::OK | StatusCode::NO_CONTENT => true,
281 StatusCode::NOT_FOUND => false,
282 StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
283 _ => throw!(Error::InvalidResponse),
284 }
285 }
286
287 #[throws]
289 pub async fn put<K: AsRef<str> + ?Sized, V: Serialize + ?Sized>(
290 &self,
291 key: &K,
292 value: &V,
293 ) -> PutStatus {
294 self.put_raw(
295 key,
296 serde_mod::to_vec(value).map_err(|_| Error::SerializeError)?,
297 )
298 .await?
299 }
300
301 #[throws]
303 pub async fn get<K: AsRef<str> + ?Sized, V: DeserializeOwned>(&self, key: &K) -> Option<V> {
304 let bytes = self.get_raw(key).await?;
305 match bytes {
306 None => None,
307 Some(bytes) => {
308 Some(serde_mod::from_slice(bytes.as_ref()).map_err(|_| Error::DeserializeError)?)
309 }
310 }
311 }
312
313 #[throws]
315 pub async fn lock<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
316 match self
317 .patch(key, &PatchValue::new(Operation::Lock, None))
318 .await
319 {
320 Ok(_) => true,
321 Err(err) => match err {
322 Error::Conflict => false,
323 err => throw!(err),
324 },
325 }
326 }
327
328 #[throws]
330 pub async fn unlock<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
331 match self
332 .patch(key, &PatchValue::new(Operation::Unlock, None))
333 .await
334 {
335 Ok(_) => true,
336 Err(err) => match err {
337 Error::Conflict => false,
338 err => throw!(err),
339 },
340 }
341 }
342
343 #[throws]
346 pub async fn increment<K: AsRef<str> + ?Sized>(&self, key: &K) {
347 self.patch(key, &PatchValue::new(Operation::Increment, None))
348 .await
349 .map_err(|err| match err {
350 Error::BadRequest => Error::NonNumericValue,
351 err => err,
352 })?
353 }
354
355 #[throws]
357 pub async fn decrement<K: AsRef<str> + ?Sized>(&self, key: &K) {
358 self.patch(key, &PatchValue::new(Operation::Decrement, None))
359 .await
360 .map_err(|err| match err {
361 Error::BadRequest => Error::NonNumericValue,
362 err => err,
363 })?
364 }
365
366 #[throws]
368 pub async fn ttl<K: AsRef<str> + ?Sized>(&self, key: &K, duration: Duration) {
369 self.patch(
370 key,
371 &PatchValue::new(Operation::TTL, Some(duration.as_secs().to_string())),
372 )
373 .await?
374 }
375
376 #[throws]
378 pub async fn notifications_raw<F, E>(
379 &self,
380 handler: F,
381 ) -> impl Stream<Item = Result<Notification<Bytes>, NotificationError<E>>>
382 where
383 F: ErrorHandler<reqwest::Error, OutError = E>,
384 E: fmt::Display + fmt::Debug + Send + Sync + 'static,
385 {
386 let url = self
387 .url
388 .join("notifications")
389 .map_err(|_| Error::InvalidUrl)?;
390 self.client
391 .get(url)
392 .headers(self.authorization()?)
393 .eventsource()
394 .unwrap()
395 .retry(NotificationsErrorHandler::new(handler))
396 .map_ok(|(event, _attempt)| Notification {
397 key: percent_encoding::percent_decode_str(&event.event.unwrap())
398 .decode_utf8_lossy()
399 .to_string(),
400 data: event.data.into(),
401 })
402 .map_err(|(err, usize)| NotificationError::Other(err, usize))
403 }
404
405 #[throws]
407 pub async fn notifications<F, T, E>(
408 &self,
409 handler: F,
410 ) -> impl Stream<Item = Result<Notification<T>, NotificationError<E>>>
411 where
412 F: ErrorHandler<reqwest::Error, OutError = E>,
413 E: fmt::Display + fmt::Debug + Send + Sync + 'static,
414 T: DeserializeOwned,
415 {
416 self.notifications_raw(handler)
417 .await?
418 .and_then(|notification| {
419 future::ready(
420 serde_mod::from_slice(¬ification.data)
421 .map_err(|_| NotificationError::DeserializeError(notification.data.clone()))
422 .and_then(|data| {
423 Ok(Notification {
424 key: notification.key,
425 data,
426 })
427 }),
428 )
429 })
430 }
431
432 #[throws]
433 async fn patch<K: AsRef<str> + ?Sized>(&self, key: &K, value: &PatchValue) {
434 let res = self
435 .client
436 .patch(self.key_url(key)?)
437 .headers(self.authorization()?)
438 .body(serde_json::to_string(&value).map_err(|_| Error::SerializeError)?)
439 .send()
440 .await
441 .map_err(Error::InvalidRequest)?;
442 match res.status() {
443 StatusCode::OK | StatusCode::NO_CONTENT => (),
444 StatusCode::NOT_FOUND => throw!(Error::NotFound),
445 StatusCode::CONFLICT => throw!(Error::Conflict),
446 StatusCode::BAD_REQUEST => throw!(Error::BadRequest),
447 StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
448 _ => throw!(Error::InvalidResponse),
449 }
450 }
451
452 #[inline]
453 #[throws]
454 fn key_url<K: AsRef<str>>(&self, key: K) -> Url {
455 let encoded = percent_encoding::utf8_percent_encode(key.as_ref(), &URL_SET).to_string();
456 self.url
457 .join(&format!("api/kv/{}", encoded))
458 .map_err(|_| Error::InvalidUrl)?
459 }
460
461 #[inline]
462 #[throws]
463 fn authorization(&self) -> HeaderMap<HeaderValue> {
464 let mut headers = HeaderMap::default();
465 let key = if let Some(ref key) = self.jwt_key {
466 key
467 } else {
468 return headers;
469 };
470
471 let iat = match SystemTime::now().duration_since(UNIX_EPOCH) {
472 Ok(n) => n.as_secs() as i64,
473 Err(_) => panic!("SystemTime before UNIX EPOCH!"),
474 };
475 let claims = Claims {
476 iat,
477 exp: iat + 60,
478 ..Default::default()
479 };
480 let token = jsonwebtoken::encode(&jsonwebtoken::Header::default(), &claims, &key)
481 .map_err(|_| Error::InvalidJWTKey)?;
482
483 headers.append(
484 header::AUTHORIZATION,
485 format!("Bearer {}", token)
486 .parse()
487 .map_err(|_| Error::InvalidJWTKey)?,
488 );
489 headers
490 }
491}
492
493impl Operation {
494 #[inline]
495 fn as_str(self) -> &'static str {
496 match self {
497 Operation::Lock => "lock",
498 Operation::Unlock => "unlock",
499 Operation::Increment => "increment",
500 Operation::Decrement => "decrement",
501 Operation::TTL => "ttl",
502 }
503 }
504}
505
506impl PatchValue {
507 #[inline]
508 fn new(operation: Operation, value: Option<String>) -> Self {
509 Self { operation, value }
510 }
511}
512
513impl Serialize for Operation {
514 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
515 where
516 S: Serializer,
517 {
518 serializer.serialize_str(self.as_str())
519 }
520}
521
522impl<F> NotificationsErrorHandler<F>
523where
524 F: ErrorHandler<reqwest::Error>,
525{
526 fn new(inner: F) -> Self {
527 Self { inner }
528 }
529}
530
531impl<F> ErrorHandler<EventsourceError<reqwest::Error>> for NotificationsErrorHandler<F>
532where
533 F: ErrorHandler<reqwest::Error>,
534{
535 type OutError = F::OutError;
536
537 fn handle(
538 &mut self,
539 attempt: usize,
540 err: EventsourceError<reqwest::Error>,
541 ) -> RetryPolicy<Self::OutError> {
542 match err {
543 EventsourceError::Parse(_) => {
544 RetryPolicy::Repeat
546 }
547 EventsourceError::Transport(err) => self.inner.handle(attempt, err),
548 }
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555
556 use futures::stream::StreamExt;
557 use serde::Deserialize;
558
559 #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
560 struct TestStruct {
561 a: u32,
562 b: String,
563 c: Vec<u8>,
564 }
565
566 #[throws]
567 fn client() -> LucidClient {
568 #[allow(unused_mut, unused_assignments)]
569 let mut builder = LucidClient::builder("http://localhost:7020");
570 #[cfg(feature = "rustls-tls")]
571 {
572 builder = LucidClient::builder("https://localhost:7021");
573 let ca_cert = Certificate::from_pem(
574 std::fs::read("test_assets/ssl/ca-cert.pem")
575 .unwrap()
576 .as_ref(),
577 )
578 .unwrap();
579 builder = builder.add_root_certificate(ca_cert);
580 }
581 builder.add_jwt_key("secret").build()?
582 }
583
584 #[test]
585 #[throws]
586 fn build() {
587 LucidClient::new("http://localhost:7020")?;
588 client()?;
589 }
590
591 #[tokio::test]
592 async fn put_raw() -> Result<(), Error> {
593 let client = client()?;
594 client.put_raw("put_raw", "value1").await?;
595 Ok(())
596 }
597
598 #[tokio::test]
599 async fn put_raw_bytes() -> Result<(), Error> {
600 let client = client()?;
601 client
602 .put_raw::<_, &[u8]>("put_raw_bytes", &[0, 1, 2, 3, 4])
603 .await?;
604 Ok(())
605 }
606
607 #[tokio::test]
608 async fn get_raw() -> Result<(), Error> {
609 let client = client()?;
610 let test_value = "value1";
611 client.put_raw("get_raw", test_value).await?;
612 let db_value = client.get_raw("get_raw").await?;
613 assert_eq!(
614 test_value,
615 String::from_utf8_lossy(db_value.unwrap().as_ref())
616 );
617 Ok(())
618 }
619
620 #[tokio::test]
621 async fn update_raw() -> Result<(), Error> {
622 let client = client()?;
623 let key = "update_raw";
624
625 let test_value1 = "value1";
626 client.put_raw(key, test_value1).await?;
627 let db_value = client.get_raw(key).await?;
628 assert_eq!(
629 test_value1,
630 String::from_utf8_lossy(db_value.unwrap().as_ref())
631 );
632
633 let test_value2 = "value2";
634 client.put_raw(key, test_value2).await?;
635 let db_value = client.get_raw(key).await?;
636 assert_eq!(
637 test_value2,
638 String::from_utf8_lossy(db_value.unwrap().as_ref())
639 );
640
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn delete_missing() -> Result<(), Error> {
646 let client = client()?;
647 assert!(!client.delete("delete_missing").await?);
648 Ok(())
649 }
650
651 #[tokio::test]
652 async fn delete() -> Result<(), Error> {
653 let client = client()?;
654 let key = "delete";
655
656 let test_value = "value";
657 client.put_raw(key, test_value).await?;
658 let db_value = client.get_raw(key).await?;
659 assert_eq!(
660 test_value,
661 String::from_utf8_lossy(db_value.unwrap().as_ref())
662 );
663
664 assert!(client.delete(key).await?);
665 let db_value = client.get_raw(key).await?;
666 assert!(db_value.is_none());
667
668 Ok(())
669 }
670
671 #[tokio::test]
672 async fn exists_false() -> Result<(), Error> {
673 let client = client()?;
674 assert!(!client.exists("exists_false").await?);
675 Ok(())
676 }
677
678 #[tokio::test]
679 async fn exists_true() -> Result<(), Error> {
680 let client = client()?;
681 client.put_raw("exists_true", "value").await?;
682 assert!(client.exists("exists_true").await?);
683 Ok(())
684 }
685
686 #[tokio::test]
687 async fn lock_unlock() -> Result<(), Error> {
688 let client = client()?;
689 let key = "lock_unlock";
690
691 client.put_raw(key, "value").await?;
692 assert!(!client.unlock(key).await?);
693 assert!(client.lock(key).await?);
694 assert!(!client.lock(key).await?);
695 assert!(client.unlock(key).await?);
696 assert!(!client.unlock(key).await?);
697
698 Ok(())
699 }
700
701 #[tokio::test]
702 async fn missing_lock() -> Result<(), Error> {
703 let client = client()?;
704 let key = "missing_lock";
705
706 assert!(matches!(client.unlock(key).await, Err(Error::NotFound)));
707 assert!(matches!(client.lock(key).await, Err(Error::NotFound)));
708
709 Ok(())
710 }
711
712 #[tokio::test]
713 async fn increment_decrement() -> Result<(), Error> {
714 let client = client()?;
715 let key = "increment_decrement";
716
717 client.put_raw(key, "0").await?;
718 assert_eq!(
719 "0",
720 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
721 );
722 client.increment(key).await?;
723 assert_eq!(
724 "1",
725 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
726 );
727 client.decrement(key).await?;
728 assert_eq!(
729 "0",
730 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
731 );
732 client.decrement(key).await?;
733 assert_eq!(
734 "-1",
735 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
736 );
737 client.increment(key).await?;
738 assert_eq!(
739 "0",
740 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
741 );
742
743 Ok(())
744 }
745
746 #[tokio::test]
747 async fn non_numeric_increment_decrement() -> Result<(), Error> {
748 let client = client()?;
749 let key = "non_numeric_increment_decrement";
750
751 client.put_raw(key, "cool").await?;
752 println!("{:?}", client.increment(key).await);
753 assert!(matches!(
754 client.increment(key).await,
755 Err(Error::NonNumericValue)
756 ));
757 assert!(matches!(
758 client.decrement(key).await,
759 Err(Error::NonNumericValue)
760 ));
761
762 Ok(())
763 }
764
765 #[tokio::test]
766 async fn ttl() -> Result<(), Error> {
767 let client = client()?;
768 let key = "ttl";
769
770 client.put_raw(key, "cool").await?;
771 client.ttl(key, Duration::from_secs(3)).await?;
772 assert_eq!(
773 "cool",
774 String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
775 );
776
777 Ok(())
778 }
779
780 #[tokio::test]
781 async fn put() -> Result<(), Error> {
782 let client = client()?;
783 let value = TestStruct {
784 a: 1,
785 b: "cool".to_string(),
786 c: vec![1, 2, 3],
787 };
788 client.put("put", &value).await?;
789 Ok(())
790 }
791
792 #[tokio::test]
793 async fn get() -> Result<(), Error> {
794 let client = client()?;
795 let test_value = TestStruct {
796 a: 1,
797 b: "cool".to_string(),
798 c: vec![1, 2, 3],
799 };
800 client.put("get", &test_value).await?;
801 let db_value = client.get("get").await?;
802 assert_eq!(Some(test_value), db_value);
803 Ok(())
804 }
805
806 #[tokio::test]
807 async fn health_check() -> Result<(), Error> {
808 let client = client()?;
809 client.health_check().await?;
810 let client = LucidClient::new("http://localhost:9999")?;
811 assert!(client.health_check().await.is_err());
812 Ok(())
813 }
814
815 #[tokio::test]
816 async fn notifications_raw() -> Result<(), Error> {
817 let client = client()?;
818 let key = "notifications_raw";
819 client.put_raw(key, "value1").await?;
820 let mut stream = client
821 .clone()
822 .notifications_raw(|err| RetryPolicy::ForwardError(err))
823 .await?;
824 let (next, _) = tokio::join!(stream.next(), client.put_raw(key, "value2"));
825 assert_eq!(
826 next.unwrap().unwrap(),
827 Notification {
828 key: key.to_string(),
829 data: "value2".into()
830 }
831 );
832 Ok(())
833 }
834
835 #[tokio::test]
836 async fn notifications() -> Result<(), Error> {
837 let client = client()?;
838 let key = "notifications";
839 let test_value1 = TestStruct {
840 a: 1,
841 b: "value1".to_string(),
842 c: vec![1, 2, 3],
843 };
844 let test_value2 = TestStruct {
845 a: 2,
846 b: "value2".to_string(),
847 c: vec![4, 5, 6],
848 };
849 client.put(key, &test_value1).await?;
850 let mut stream = client
851 .clone()
852 .notifications(|err| RetryPolicy::ForwardError(err))
853 .await?;
854 let (next, _) = tokio::join!(stream.next(), client.put(key, &test_value2));
855 assert_eq!(
856 next.unwrap().unwrap(),
857 Notification {
858 key: key.to_string(),
859 data: test_value2
860 }
861 );
862 Ok(())
863 }
864}