Skip to main content

lucid_kv_client/
lib.rs

1//! A simple Client for the Lucid KV
2
3#[macro_use]
4extern crate failure;
5
6#[macro_use]
7extern crate fehler;
8
9#[macro_use]
10extern crate lazy_static;
11
12pub use futures_retry::{ErrorHandler, RetryPolicy};
13
14use bytes::Bytes;
15use futures::future;
16use futures::{Stream, TryStreamExt};
17use futures_retry::StreamRetryExt;
18use jsonwebtoken::EncodingKey;
19use reqwest::header::{self, HeaderMap, HeaderValue};
20use reqwest::{Body, Client, ClientBuilder, StatusCode, Url};
21use reqwest_eventsource::{Error as EventsourceError, RequestBuilderExt};
22use serde::{de::DeserializeOwned, ser::Serializer, Serialize};
23use std::fmt;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25
26cfg_if::cfg_if! {
27    if #[cfg(feature = "flexbuffers")] {
28        use flexbuffers as serde_mod;
29    } else {
30        use serde_json as serde_mod;
31    }
32}
33
34#[cfg(feature = "rustls-tls")]
35pub use reqwest::Certificate;
36
37lazy_static! {
38    static ref URL_SET: percent_encoding::AsciiSet = percent_encoding::CONTROLS
39        .add(b'/')
40        .add(b'#')
41        .add(b' ')
42        .add(b'?')
43        .add(b'%');
44}
45
46/// Errors when doing Client operations
47#[derive(Fail, Debug)]
48pub enum Error {
49    #[fail(display = "invalid url")]
50    InvalidUrl,
51    #[fail(display = "invalid client")]
52    InvalidClient(reqwest::Error),
53    #[fail(display = "invalid response")]
54    InvalidResponse,
55    #[fail(display = "invalid request: {}", _0)]
56    InvalidRequest(reqwest::Error),
57    #[fail(display = "unauthorized")]
58    Unauthorized,
59    #[fail(display = "conflict")]
60    Conflict,
61    #[fail(display = "not found")]
62    NotFound,
63    #[fail(display = "non numeric value")]
64    NonNumericValue,
65    #[fail(display = "bad request")]
66    BadRequest,
67    #[fail(display = "serialize error")]
68    SerializeError,
69    #[fail(display = "deserialize error")]
70    DeserializeError,
71    #[fail(display = "invalid JWT key")]
72    InvalidJWTKey,
73}
74
75/// Errors when retrieving Notifications
76#[derive(Fail, Debug)]
77pub enum NotificationError<E>
78where
79    E: fmt::Display + fmt::Debug + Send + Sync + 'static,
80{
81    #[fail(display = "transport error after {} attempts: {}", _1, _0)]
82    Other(E, usize),
83    #[fail(display = "deserialize error")]
84    DeserializeError(Bytes),
85}
86
87/// Whether a Key was created or not
88#[repr(u16)]
89#[derive(Debug, Clone, Copy, Eq, PartialEq)]
90pub enum PutStatus {
91    Ok,
92    Created,
93}
94
95#[derive(Debug, Clone, Copy)]
96enum Operation {
97    Lock,
98    Unlock,
99    Increment,
100    Decrement,
101    TTL,
102}
103
104#[derive(Debug, Clone, Default, Serialize)]
105struct Claims {
106    sub: String,
107    iss: String,
108    iat: i64,
109    exp: i64,
110}
111
112#[derive(Debug, Clone, Serialize)]
113struct PatchValue {
114    operation: Operation,
115    value: Option<String>,
116}
117
118struct NotificationsErrorHandler<F>
119where
120    F: ErrorHandler<reqwest::Error>,
121{
122    inner: F,
123}
124
125/// A notification sent when a key value pair is changed
126#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
127pub struct Notification<T> {
128    pub key: String,
129    pub data: T,
130}
131
132/// The main Client
133#[derive(Clone, Debug)]
134pub struct LucidClient {
135    client: Client,
136    url: Url,
137    jwt_key: Option<EncodingKey>,
138}
139
140/// A builder for adding custom options to the LucidClient
141#[derive(Debug)]
142pub struct Builder<'a> {
143    client: ClientBuilder,
144    url: &'a str,
145    jwt_key: Option<EncodingKey>,
146}
147
148impl<'a> Builder<'a> {
149    /// Create a new Client Builder with a base URL (e.g. `http://localhost:7020`)
150    pub fn new<U: AsRef<str> + ?Sized>(base_url: &'a U) -> Self {
151        Self {
152            client: ClientBuilder::new(),
153            url: base_url.as_ref(),
154            jwt_key: None,
155        }
156    }
157
158    /// Add a JWT secret to authenticate against
159    pub fn add_jwt_key<T: AsRef<[u8]> + ?Sized>(mut self, key: &T) -> Self {
160        self.jwt_key = Some(EncodingKey::from_secret(key.as_ref()));
161        self
162    }
163
164    #[cfg(feature = "rustls-tls")]
165    /// Add a custom root certificate
166    pub fn add_root_certificate(mut self, cert: Certificate) -> Self {
167        self.client = self.client.add_root_certificate(cert);
168        self
169    }
170
171    /// Build the LucidClient itself
172    #[throws]
173    pub fn build(self) -> LucidClient {
174        LucidClient {
175            client: self.client.build().map_err(Error::InvalidClient)?,
176            url: Url::parse(self.url).map_err(|_| Error::InvalidUrl)?,
177            jwt_key: self.jwt_key,
178        }
179    }
180}
181
182impl LucidClient {
183    /// Build a basic Client. This is equivalent to `Builder::new(url).build()`
184    #[throws]
185    pub fn new<U: AsRef<str> + ?Sized>(base_url: &U) -> Self {
186        Builder::new(base_url).build()?
187    }
188
189    /// Configure a Client with the Builder
190    pub fn builder<'a, U: AsRef<str> + ?Sized>(base_url: &'a U) -> Builder<'a> {
191        Builder::new(base_url)
192    }
193
194    /// Health check
195    #[throws]
196    pub async fn health_check(&self) {
197        let url = self.url.join("health").map_err(|_| Error::InvalidUrl)?;
198        let res = self
199            .client
200            .head(url)
201            .send()
202            .await
203            .map_err(Error::InvalidRequest)?;
204        match res.status() {
205            StatusCode::OK => (),
206            _ => throw!(Error::InvalidResponse),
207        }
208    }
209
210    /// Store a string or bytes as a value for a key. Creates a new key if it does not exist
211    #[throws]
212    pub async fn put_raw<K: AsRef<str> + ?Sized, V: Into<Body>>(
213        &self,
214        key: &K,
215        value: V,
216    ) -> PutStatus {
217        let res = self
218            .client
219            .put(self.key_url(key)?)
220            .headers(self.authorization()?)
221            .body(value)
222            .send()
223            .await
224            .map_err(Error::InvalidRequest)?;
225        match res.status() {
226            StatusCode::OK => PutStatus::Ok,
227            StatusCode::CREATED => PutStatus::Created,
228            StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
229            StatusCode::CONFLICT => throw!(Error::Conflict),
230            _ => throw!(Error::InvalidResponse),
231        }
232    }
233
234    /// Gets raw bytes from a key's value
235    #[throws]
236    pub async fn get_raw<K: AsRef<str> + ?Sized>(&self, key: &K) -> Option<Bytes> {
237        let res = self
238            .client
239            .get(self.key_url(key)?)
240            .headers(self.authorization()?)
241            .send()
242            .await
243            .map_err(Error::InvalidRequest)?;
244        match res.status() {
245            StatusCode::OK => Some(res.bytes().await.map_err(|_| Error::InvalidResponse)?),
246            StatusCode::NOT_FOUND => None,
247            _ => throw!(Error::InvalidResponse),
248        }
249    }
250
251    /// Delete a key's value. Returns `true` if the key existed and was actually deleted
252    #[throws]
253    pub async fn delete<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
254        let res = self
255            .client
256            .delete(self.key_url(key)?)
257            .headers(self.authorization()?)
258            .send()
259            .await
260            .map_err(Error::InvalidRequest)?;
261        match res.status() {
262            StatusCode::OK | StatusCode::NO_CONTENT => true,
263            StatusCode::NOT_FOUND => false,
264            StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
265            _ => throw!(Error::InvalidResponse),
266        }
267    }
268
269    /// Check if a key exists
270    #[throws]
271    pub async fn exists<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
272        let res = self
273            .client
274            .head(self.key_url(key)?)
275            .headers(self.authorization()?)
276            .send()
277            .await
278            .map_err(Error::InvalidRequest)?;
279        match res.status() {
280            StatusCode::OK | StatusCode::NO_CONTENT => true,
281            StatusCode::NOT_FOUND => false,
282            StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
283            _ => throw!(Error::InvalidResponse),
284        }
285    }
286
287    /// Serialize a rust object and store as the value for a key
288    #[throws]
289    pub async fn put<K: AsRef<str> + ?Sized, V: Serialize + ?Sized>(
290        &self,
291        key: &K,
292        value: &V,
293    ) -> PutStatus {
294        self.put_raw(
295            key,
296            serde_mod::to_vec(value).map_err(|_| Error::SerializeError)?,
297        )
298        .await?
299    }
300
301    /// Get the value for a key and deserialize it into a rust object
302    #[throws]
303    pub async fn get<K: AsRef<str> + ?Sized, V: DeserializeOwned>(&self, key: &K) -> Option<V> {
304        let bytes = self.get_raw(key).await?;
305        match bytes {
306            None => None,
307            Some(bytes) => {
308                Some(serde_mod::from_slice(bytes.as_ref()).map_err(|_| Error::DeserializeError)?)
309            }
310        }
311    }
312
313    /// Lock a key. Returns `false` if the key is already locked and `true` otherwise
314    #[throws]
315    pub async fn lock<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
316        match self
317            .patch(key, &PatchValue::new(Operation::Lock, None))
318            .await
319        {
320            Ok(_) => true,
321            Err(err) => match err {
322                Error::Conflict => false,
323                err => throw!(err),
324            },
325        }
326    }
327
328    /// Unlock a key. Returns `false` if the key is already unlocked and `true` otherwise
329    #[throws]
330    pub async fn unlock<K: AsRef<str> + ?Sized>(&self, key: &K) -> bool {
331        match self
332            .patch(key, &PatchValue::new(Operation::Unlock, None))
333            .await
334        {
335            Ok(_) => true,
336            Err(err) => match err {
337                Error::Conflict => false,
338                err => throw!(err),
339            },
340        }
341    }
342
343    /// Increment a key. Note the key's value must be like `b"0"` otherwise this will throw an
344    /// [Error::NonNumericValue]
345    #[throws]
346    pub async fn increment<K: AsRef<str> + ?Sized>(&self, key: &K) {
347        self.patch(key, &PatchValue::new(Operation::Increment, None))
348            .await
349            .map_err(|err| match err {
350                Error::BadRequest => Error::NonNumericValue,
351                err => err,
352            })?
353    }
354
355    /// Decrement a key. See [LucidClient::increment] for more info.
356    #[throws]
357    pub async fn decrement<K: AsRef<str> + ?Sized>(&self, key: &K) {
358        self.patch(key, &PatchValue::new(Operation::Decrement, None))
359            .await
360            .map_err(|err| match err {
361                Error::BadRequest => Error::NonNumericValue,
362                err => err,
363            })?
364    }
365
366    /// Add a "time to live" constraint to a key
367    #[throws]
368    pub async fn ttl<K: AsRef<str> + ?Sized>(&self, key: &K, duration: Duration) {
369        self.patch(
370            key,
371            &PatchValue::new(Operation::TTL, Some(duration.as_secs().to_string())),
372        )
373        .await?
374    }
375
376    /// Get raw notification blobs
377    #[throws]
378    pub async fn notifications_raw<F, E>(
379        &self,
380        handler: F,
381    ) -> impl Stream<Item = Result<Notification<Bytes>, NotificationError<E>>>
382    where
383        F: ErrorHandler<reqwest::Error, OutError = E>,
384        E: fmt::Display + fmt::Debug + Send + Sync + 'static,
385    {
386        let url = self
387            .url
388            .join("notifications")
389            .map_err(|_| Error::InvalidUrl)?;
390        self.client
391            .get(url)
392            .headers(self.authorization()?)
393            .eventsource()
394            .unwrap()
395            .retry(NotificationsErrorHandler::new(handler))
396            .map_ok(|(event, _attempt)| Notification {
397                key: percent_encoding::percent_decode_str(&event.event.unwrap())
398                    .decode_utf8_lossy()
399                    .to_string(),
400                data: event.data.into(),
401            })
402            .map_err(|(err, usize)| NotificationError::Other(err, usize))
403    }
404
405    /// Get notifications and deserialize them into objects
406    #[throws]
407    pub async fn notifications<F, T, E>(
408        &self,
409        handler: F,
410    ) -> impl Stream<Item = Result<Notification<T>, NotificationError<E>>>
411    where
412        F: ErrorHandler<reqwest::Error, OutError = E>,
413        E: fmt::Display + fmt::Debug + Send + Sync + 'static,
414        T: DeserializeOwned,
415    {
416        self.notifications_raw(handler)
417            .await?
418            .and_then(|notification| {
419                future::ready(
420                    serde_mod::from_slice(&notification.data)
421                        .map_err(|_| NotificationError::DeserializeError(notification.data.clone()))
422                        .and_then(|data| {
423                            Ok(Notification {
424                                key: notification.key,
425                                data,
426                            })
427                        }),
428                )
429            })
430    }
431
432    #[throws]
433    async fn patch<K: AsRef<str> + ?Sized>(&self, key: &K, value: &PatchValue) {
434        let res = self
435            .client
436            .patch(self.key_url(key)?)
437            .headers(self.authorization()?)
438            .body(serde_json::to_string(&value).map_err(|_| Error::SerializeError)?)
439            .send()
440            .await
441            .map_err(Error::InvalidRequest)?;
442        match res.status() {
443            StatusCode::OK | StatusCode::NO_CONTENT => (),
444            StatusCode::NOT_FOUND => throw!(Error::NotFound),
445            StatusCode::CONFLICT => throw!(Error::Conflict),
446            StatusCode::BAD_REQUEST => throw!(Error::BadRequest),
447            StatusCode::UNAUTHORIZED => throw!(Error::Unauthorized),
448            _ => throw!(Error::InvalidResponse),
449        }
450    }
451
452    #[inline]
453    #[throws]
454    fn key_url<K: AsRef<str>>(&self, key: K) -> Url {
455        let encoded = percent_encoding::utf8_percent_encode(key.as_ref(), &URL_SET).to_string();
456        self.url
457            .join(&format!("api/kv/{}", encoded))
458            .map_err(|_| Error::InvalidUrl)?
459    }
460
461    #[inline]
462    #[throws]
463    fn authorization(&self) -> HeaderMap<HeaderValue> {
464        let mut headers = HeaderMap::default();
465        let key = if let Some(ref key) = self.jwt_key {
466            key
467        } else {
468            return headers;
469        };
470
471        let iat = match SystemTime::now().duration_since(UNIX_EPOCH) {
472            Ok(n) => n.as_secs() as i64,
473            Err(_) => panic!("SystemTime before UNIX EPOCH!"),
474        };
475        let claims = Claims {
476            iat,
477            exp: iat + 60,
478            ..Default::default()
479        };
480        let token = jsonwebtoken::encode(&jsonwebtoken::Header::default(), &claims, &key)
481            .map_err(|_| Error::InvalidJWTKey)?;
482
483        headers.append(
484            header::AUTHORIZATION,
485            format!("Bearer {}", token)
486                .parse()
487                .map_err(|_| Error::InvalidJWTKey)?,
488        );
489        headers
490    }
491}
492
493impl Operation {
494    #[inline]
495    fn as_str(self) -> &'static str {
496        match self {
497            Operation::Lock => "lock",
498            Operation::Unlock => "unlock",
499            Operation::Increment => "increment",
500            Operation::Decrement => "decrement",
501            Operation::TTL => "ttl",
502        }
503    }
504}
505
506impl PatchValue {
507    #[inline]
508    fn new(operation: Operation, value: Option<String>) -> Self {
509        Self { operation, value }
510    }
511}
512
513impl Serialize for Operation {
514    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
515    where
516        S: Serializer,
517    {
518        serializer.serialize_str(self.as_str())
519    }
520}
521
522impl<F> NotificationsErrorHandler<F>
523where
524    F: ErrorHandler<reqwest::Error>,
525{
526    fn new(inner: F) -> Self {
527        Self { inner }
528    }
529}
530
531impl<F> ErrorHandler<EventsourceError<reqwest::Error>> for NotificationsErrorHandler<F>
532where
533    F: ErrorHandler<reqwest::Error>,
534{
535    type OutError = F::OutError;
536
537    fn handle(
538        &mut self,
539        attempt: usize,
540        err: EventsourceError<reqwest::Error>,
541    ) -> RetryPolicy<Self::OutError> {
542        match err {
543            EventsourceError::Parse(_) => {
544                // ignore all parsing errors
545                RetryPolicy::Repeat
546            }
547            EventsourceError::Transport(err) => self.inner.handle(attempt, err),
548        }
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555
556    use futures::stream::StreamExt;
557    use serde::Deserialize;
558
559    #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
560    struct TestStruct {
561        a: u32,
562        b: String,
563        c: Vec<u8>,
564    }
565
566    #[throws]
567    fn client() -> LucidClient {
568        #[allow(unused_mut, unused_assignments)]
569        let mut builder = LucidClient::builder("http://localhost:7020");
570        #[cfg(feature = "rustls-tls")]
571        {
572            builder = LucidClient::builder("https://localhost:7021");
573            let ca_cert = Certificate::from_pem(
574                std::fs::read("test_assets/ssl/ca-cert.pem")
575                    .unwrap()
576                    .as_ref(),
577            )
578            .unwrap();
579            builder = builder.add_root_certificate(ca_cert);
580        }
581        builder.add_jwt_key("secret").build()?
582    }
583
584    #[test]
585    #[throws]
586    fn build() {
587        LucidClient::new("http://localhost:7020")?;
588        client()?;
589    }
590
591    #[tokio::test]
592    async fn put_raw() -> Result<(), Error> {
593        let client = client()?;
594        client.put_raw("put_raw", "value1").await?;
595        Ok(())
596    }
597
598    #[tokio::test]
599    async fn put_raw_bytes() -> Result<(), Error> {
600        let client = client()?;
601        client
602            .put_raw::<_, &[u8]>("put_raw_bytes", &[0, 1, 2, 3, 4])
603            .await?;
604        Ok(())
605    }
606
607    #[tokio::test]
608    async fn get_raw() -> Result<(), Error> {
609        let client = client()?;
610        let test_value = "value1";
611        client.put_raw("get_raw", test_value).await?;
612        let db_value = client.get_raw("get_raw").await?;
613        assert_eq!(
614            test_value,
615            String::from_utf8_lossy(db_value.unwrap().as_ref())
616        );
617        Ok(())
618    }
619
620    #[tokio::test]
621    async fn update_raw() -> Result<(), Error> {
622        let client = client()?;
623        let key = "update_raw";
624
625        let test_value1 = "value1";
626        client.put_raw(key, test_value1).await?;
627        let db_value = client.get_raw(key).await?;
628        assert_eq!(
629            test_value1,
630            String::from_utf8_lossy(db_value.unwrap().as_ref())
631        );
632
633        let test_value2 = "value2";
634        client.put_raw(key, test_value2).await?;
635        let db_value = client.get_raw(key).await?;
636        assert_eq!(
637            test_value2,
638            String::from_utf8_lossy(db_value.unwrap().as_ref())
639        );
640
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn delete_missing() -> Result<(), Error> {
646        let client = client()?;
647        assert!(!client.delete("delete_missing").await?);
648        Ok(())
649    }
650
651    #[tokio::test]
652    async fn delete() -> Result<(), Error> {
653        let client = client()?;
654        let key = "delete";
655
656        let test_value = "value";
657        client.put_raw(key, test_value).await?;
658        let db_value = client.get_raw(key).await?;
659        assert_eq!(
660            test_value,
661            String::from_utf8_lossy(db_value.unwrap().as_ref())
662        );
663
664        assert!(client.delete(key).await?);
665        let db_value = client.get_raw(key).await?;
666        assert!(db_value.is_none());
667
668        Ok(())
669    }
670
671    #[tokio::test]
672    async fn exists_false() -> Result<(), Error> {
673        let client = client()?;
674        assert!(!client.exists("exists_false").await?);
675        Ok(())
676    }
677
678    #[tokio::test]
679    async fn exists_true() -> Result<(), Error> {
680        let client = client()?;
681        client.put_raw("exists_true", "value").await?;
682        assert!(client.exists("exists_true").await?);
683        Ok(())
684    }
685
686    #[tokio::test]
687    async fn lock_unlock() -> Result<(), Error> {
688        let client = client()?;
689        let key = "lock_unlock";
690
691        client.put_raw(key, "value").await?;
692        assert!(!client.unlock(key).await?);
693        assert!(client.lock(key).await?);
694        assert!(!client.lock(key).await?);
695        assert!(client.unlock(key).await?);
696        assert!(!client.unlock(key).await?);
697
698        Ok(())
699    }
700
701    #[tokio::test]
702    async fn missing_lock() -> Result<(), Error> {
703        let client = client()?;
704        let key = "missing_lock";
705
706        assert!(matches!(client.unlock(key).await, Err(Error::NotFound)));
707        assert!(matches!(client.lock(key).await, Err(Error::NotFound)));
708
709        Ok(())
710    }
711
712    #[tokio::test]
713    async fn increment_decrement() -> Result<(), Error> {
714        let client = client()?;
715        let key = "increment_decrement";
716
717        client.put_raw(key, "0").await?;
718        assert_eq!(
719            "0",
720            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
721        );
722        client.increment(key).await?;
723        assert_eq!(
724            "1",
725            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
726        );
727        client.decrement(key).await?;
728        assert_eq!(
729            "0",
730            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
731        );
732        client.decrement(key).await?;
733        assert_eq!(
734            "-1",
735            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
736        );
737        client.increment(key).await?;
738        assert_eq!(
739            "0",
740            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
741        );
742
743        Ok(())
744    }
745
746    #[tokio::test]
747    async fn non_numeric_increment_decrement() -> Result<(), Error> {
748        let client = client()?;
749        let key = "non_numeric_increment_decrement";
750
751        client.put_raw(key, "cool").await?;
752        println!("{:?}", client.increment(key).await);
753        assert!(matches!(
754            client.increment(key).await,
755            Err(Error::NonNumericValue)
756        ));
757        assert!(matches!(
758            client.decrement(key).await,
759            Err(Error::NonNumericValue)
760        ));
761
762        Ok(())
763    }
764
765    #[tokio::test]
766    async fn ttl() -> Result<(), Error> {
767        let client = client()?;
768        let key = "ttl";
769
770        client.put_raw(key, "cool").await?;
771        client.ttl(key, Duration::from_secs(3)).await?;
772        assert_eq!(
773            "cool",
774            String::from_utf8_lossy(client.get_raw(key).await?.unwrap().as_ref())
775        );
776
777        Ok(())
778    }
779
780    #[tokio::test]
781    async fn put() -> Result<(), Error> {
782        let client = client()?;
783        let value = TestStruct {
784            a: 1,
785            b: "cool".to_string(),
786            c: vec![1, 2, 3],
787        };
788        client.put("put", &value).await?;
789        Ok(())
790    }
791
792    #[tokio::test]
793    async fn get() -> Result<(), Error> {
794        let client = client()?;
795        let test_value = TestStruct {
796            a: 1,
797            b: "cool".to_string(),
798            c: vec![1, 2, 3],
799        };
800        client.put("get", &test_value).await?;
801        let db_value = client.get("get").await?;
802        assert_eq!(Some(test_value), db_value);
803        Ok(())
804    }
805
806    #[tokio::test]
807    async fn health_check() -> Result<(), Error> {
808        let client = client()?;
809        client.health_check().await?;
810        let client = LucidClient::new("http://localhost:9999")?;
811        assert!(client.health_check().await.is_err());
812        Ok(())
813    }
814
815    #[tokio::test]
816    async fn notifications_raw() -> Result<(), Error> {
817        let client = client()?;
818        let key = "notifications_raw";
819        client.put_raw(key, "value1").await?;
820        let mut stream = client
821            .clone()
822            .notifications_raw(|err| RetryPolicy::ForwardError(err))
823            .await?;
824        let (next, _) = tokio::join!(stream.next(), client.put_raw(key, "value2"));
825        assert_eq!(
826            next.unwrap().unwrap(),
827            Notification {
828                key: key.to_string(),
829                data: "value2".into()
830            }
831        );
832        Ok(())
833    }
834
835    #[tokio::test]
836    async fn notifications() -> Result<(), Error> {
837        let client = client()?;
838        let key = "notifications";
839        let test_value1 = TestStruct {
840            a: 1,
841            b: "value1".to_string(),
842            c: vec![1, 2, 3],
843        };
844        let test_value2 = TestStruct {
845            a: 2,
846            b: "value2".to_string(),
847            c: vec![4, 5, 6],
848        };
849        client.put(key, &test_value1).await?;
850        let mut stream = client
851            .clone()
852            .notifications(|err| RetryPolicy::ForwardError(err))
853            .await?;
854        let (next, _) = tokio::join!(stream.next(), client.put(key, &test_value2));
855        assert_eq!(
856            next.unwrap().unwrap(),
857            Notification {
858                key: key.to_string(),
859                data: test_value2
860            }
861        );
862        Ok(())
863    }
864}