memorix_client_redis/
lib.rs

1extern crate futures_core;
2extern crate futures_util;
3extern crate proc_macro;
4extern crate redis;
5extern crate serde;
6extern crate serde_json;
7extern crate uuid;
8
9mod utils;
10
11use redis::AsyncCommands;
12
13pub use futures_util::StreamExt;
14pub use memorix_client_redis_macros::serialization;
15use redis::Value as RedisValue;
16
17#[doc(hidden)]
18pub mod __private {
19    pub extern crate serde;
20}
21
22pub struct Expose;
23pub struct Hide;
24
25#[derive(Clone)]
26pub enum Value {
27    String {
28        value: &'static str,
29    },
30    EnvVariable {
31        name: &'static str,
32        value: Option<String>,
33    },
34}
35
36impl Value {
37    pub fn from_string(value: &'static str) -> Self {
38        Self::String { value }
39    }
40    pub fn from_env_variable(name: &'static str) -> Self {
41        let value = std::env::var(name).ok();
42        Self::EnvVariable { name, value }
43    }
44    fn require(&self) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
45        match self {
46            Self::String { value } => Ok((*value).to_string()),
47            Self::EnvVariable { name, value } => Ok(value
48                .clone()
49                .ok_or(format!("Environment variable \"{name}\" is not set"))?),
50        }
51    }
52}
53
54#[derive(Clone, Default)]
55pub struct MemorixCacheOptions {
56    pub ttl_ms: Option<Value>,
57    pub extend_on_get: Option<Value>,
58}
59#[derive(Clone)]
60pub struct MemorixCacheOptionsInner {
61    pub ttl_ms: usize,
62    pub extend_on_get: bool,
63}
64
65impl MemorixCacheOptions {
66    fn get_ttl_ms(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
67        Ok(match self.ttl_ms.as_ref() {
68            Some(x) => x.require()?.parse()?,
69            None => 0,
70        })
71    }
72}
73impl MemorixCacheOptions {
74    fn get_extend_on_get(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
75        Ok(match self.extend_on_get.as_ref() {
76            Some(x) => x.require()?.parse()?,
77            None => false,
78        })
79    }
80}
81
82#[derive(Clone, Default)]
83pub struct MemorixTaskOptions {
84    pub queue_type: Option<Value>,
85}
86
87#[derive(Clone)]
88pub enum QueueType {
89    Fifo,
90    Lifo,
91}
92impl MemorixTaskOptions {
93    fn get_queue_type(&self) -> Result<QueueType, Box<dyn std::error::Error + Send + Sync>> {
94        Ok(match &self.queue_type {
95            Some(x) => {
96                let value_str = x.require()?;
97                match value_str.as_str() {
98                    "lifo" => QueueType::Lifo,
99                    "fifo" => QueueType::Fifo,
100                    _ => {
101                        return Err(format!(
102                            "no valid option for \"queue_type\", given \"{value_str}\""
103                        )
104                        .into())
105                    }
106                }
107            }
108            None => QueueType::Fifo,
109        })
110    }
111}
112
113#[derive(Clone)]
114pub struct MemorixBase {
115    client: redis::Client,
116    redis: redis::aio::MultiplexedConnection,
117    task_redis: redis::aio::MultiplexedConnection,
118    namespace_name_tree: &'static [&'static str],
119}
120
121impl MemorixBase {
122    pub async fn new(
123        redis_url: &Value,
124        namespace_name_tree: &'static [&'static str],
125    ) -> Result<MemorixBase, Box<dyn std::error::Error + Sync + Send>> {
126        let client = redis::Client::open(redis_url.require()?)?;
127        let redis = client.get_multiplexed_async_connection().await?;
128        let task_redis = client.get_multiplexed_async_connection().await?;
129        Ok(Self {
130            client,
131            redis,
132            task_redis,
133            namespace_name_tree,
134        })
135    }
136    pub fn from(other: Self, namespace_name_tree: &'static [&'static str]) -> Self {
137        Self {
138            client: other.client,
139            redis: other.redis,
140            task_redis: other.task_redis,
141            namespace_name_tree,
142        }
143    }
144}
145
146pub trait CanCacheGet {}
147pub trait CanCacheSet {}
148pub trait CanCacheDelete {}
149pub trait CanCacheExpire {}
150
151impl CanCacheGet for Expose {}
152impl CanCacheSet for Expose {}
153impl CanCacheDelete for Expose {}
154impl CanCacheExpire for Expose {}
155
156pub struct MemorixCacheItem<K, P, G, S, D, E> {
157    memorix_base: MemorixBase,
158    id: String,
159    has_key: bool,
160    options: MemorixCacheOptions,
161    _marker: std::marker::PhantomData<(K, P, G, S, D, E)>,
162}
163impl<K, P, G, S, D, E> Clone for MemorixCacheItem<K, P, G, S, D, E> {
164    fn clone(&self) -> Self {
165        Self {
166            memorix_base: self.memorix_base.clone(),
167            has_key: self.has_key,
168            id: self.id.clone(),
169            options: self.options.clone(),
170            _marker: self._marker,
171        }
172    }
173}
174
175impl<K: serde::Serialize, P: serde::Serialize + serde::de::DeserializeOwned, G, S, D, E>
176    MemorixCacheItem<K, P, G, S, D, E>
177{
178    pub fn new(
179        memorix_base: MemorixBase,
180        id: String,
181        options: Option<MemorixCacheOptions>,
182    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
183        Ok(Self {
184            memorix_base,
185            id,
186            has_key: true,
187            options: options.unwrap_or_default(),
188            _marker: std::marker::PhantomData,
189        })
190    }
191    fn new_no_key(
192        memorix_base: MemorixBase,
193        id: String,
194        options: Option<MemorixCacheOptions>,
195    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
196        Ok(Self {
197            memorix_base,
198            id,
199            has_key: false,
200            options: options.unwrap_or_default(),
201            _marker: std::marker::PhantomData,
202        })
203    }
204    pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
205        let prefix = match self.memorix_base.namespace_name_tree.len() {
206            0 => "".to_string(),
207            _ => format!(
208                "{},",
209                self.memorix_base
210                    .namespace_name_tree
211                    .iter()
212                    .map(|x| format!("\"{}\"", x))
213                    .collect::<Vec<_>>()
214                    .join(",")
215            ),
216        };
217        Ok(match self.has_key {
218            false => format!("[{}\"{}\"]", prefix, self.id),
219            true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
220        })
221    }
222    pub async fn extend(
223        &mut self,
224        key: &K,
225    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
226        let hashed_key = self.key(key)?;
227        let ttl_ms = match self.options.get_ttl_ms()? {
228            0 => return Err("Called extend with no ttl_ms".into()),
229            x => x,
230        };
231        let _: RedisValue = self
232            .memorix_base
233            .redis
234            .pexpire(hashed_key, ttl_ms.try_into()?)
235            .await?;
236        Ok(())
237    }
238}
239
240impl<
241        K: serde::Serialize,
242        P: serde::Serialize + serde::de::DeserializeOwned,
243        G: CanCacheGet,
244        S,
245        D,
246        E,
247    > MemorixCacheItem<K, P, G, S, D, E>
248{
249    pub async fn get(
250        &mut self,
251        key: &K,
252    ) -> Result<Option<P>, Box<dyn std::error::Error + Sync + Send>> {
253        let payload_str: Option<String> = self.memorix_base.redis.get(self.key(key)?).await?;
254
255        let payload_str = match payload_str {
256            Some(x) => x,
257            None => {
258                return Ok(None);
259            }
260        };
261
262        let payload: P = serde_json::from_str(&payload_str)?;
263
264        if self.options.get_extend_on_get()? {
265            self.extend(key).await?;
266        }
267
268        Ok(Some(payload))
269    }
270}
271
272impl<
273        K: serde::Serialize,
274        P: serde::Serialize + serde::de::DeserializeOwned,
275        G,
276        S: CanCacheSet,
277        D,
278        E,
279    > MemorixCacheItem<K, P, G, S, D, E>
280{
281    pub async fn set(
282        &mut self,
283        key: &K,
284        payload: &P,
285    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
286        let payload_str = serde_json::to_string(&payload)?;
287        let _: RedisValue = self
288            .memorix_base
289            .redis
290            .set_options(
291                self.key(key)?,
292                payload_str,
293                match self.options.get_ttl_ms()? {
294                    0 => redis::SetOptions::default(),
295                    ttl_ms => redis::SetOptions::default()
296                        .with_expiration(redis::SetExpiry::PX(ttl_ms.try_into()?)),
297                },
298            )
299            .await?;
300
301        Ok(())
302    }
303}
304
305impl<
306        K: serde::Serialize,
307        P: serde::Serialize + serde::de::DeserializeOwned,
308        G,
309        S,
310        D: CanCacheDelete,
311        E,
312    > MemorixCacheItem<K, P, G, S, D, E>
313{
314    pub async fn delete(
315        &mut self,
316        key: &K,
317    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
318        let _: RedisValue = self.memorix_base.redis.del(self.key(key)?).await?;
319
320        Ok(())
321    }
322}
323
324impl<
325        K: serde::Serialize,
326        P: serde::Serialize + serde::de::DeserializeOwned,
327        G,
328        S,
329        D,
330        E: CanCacheExpire,
331    > MemorixCacheItem<K, P, G, S, D, E>
332{
333    pub async fn expire(
334        &mut self,
335        key: &K,
336        ttl_ms: usize,
337    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
338        let _: RedisValue = self
339            .memorix_base
340            .redis
341            .pexpire(self.key(key)?, ttl_ms.try_into()?)
342            .await?;
343
344        Ok(())
345    }
346}
347
348pub struct MemorixCacheItemNoKey<P, G, S, D, E> {
349    base_item:
350        MemorixCacheItem<std::marker::PhantomData<std::marker::PhantomData<u8>>, P, G, S, D, E>,
351}
352impl<P, G, S, D, E> Clone for MemorixCacheItemNoKey<P, G, S, D, E> {
353    fn clone(&self) -> Self {
354        Self {
355            base_item: self.base_item.clone(),
356        }
357    }
358}
359
360impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S, D, E>
361    MemorixCacheItemNoKey<P, G, S, D, E>
362{
363    pub fn new(
364        memorix_base: MemorixBase,
365        id: String,
366        options: Option<MemorixCacheOptions>,
367    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
368        Ok(Self {
369            base_item: MemorixCacheItem::new_no_key(memorix_base, id, options)?,
370        })
371    }
372}
373impl<P: serde::de::DeserializeOwned + serde::Serialize, G: CanCacheGet, S, D, E>
374    MemorixCacheItemNoKey<P, G, S, D, E>
375{
376    pub async fn get(&mut self) -> Result<Option<P>, Box<dyn std::error::Error + Sync + Send>> {
377        self.base_item.get(&std::marker::PhantomData).await
378    }
379}
380impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S: CanCacheSet, D, E>
381    MemorixCacheItemNoKey<P, G, S, D, E>
382{
383    pub async fn set(
384        &mut self,
385        payload: &P,
386    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
387        self.base_item.set(&std::marker::PhantomData, payload).await
388    }
389}
390impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S, D: CanCacheDelete, E>
391    MemorixCacheItemNoKey<P, G, S, D, E>
392{
393    pub async fn delete(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
394        self.base_item.delete(&std::marker::PhantomData).await
395    }
396}
397
398pub trait CanPubSubPublish {}
399pub trait CanPubSubSubscribe {}
400
401impl CanPubSubPublish for Expose {}
402impl CanPubSubSubscribe for Expose {}
403
404// #[derive(Clone)]
405pub struct MemorixPubSubItem<K, P, PU, S> {
406    memorix_base: MemorixBase,
407    id: String,
408    has_key: bool,
409    _marker: std::marker::PhantomData<(K, P, PU, S)>,
410}
411
412impl<K, P, PU, S> Clone for MemorixPubSubItem<K, P, PU, S> {
413    fn clone(&self) -> Self {
414        Self {
415            memorix_base: self.memorix_base.clone(),
416            has_key: self.has_key,
417            id: self.id.clone(),
418            _marker: self._marker,
419        }
420    }
421}
422
423impl<K: serde::Serialize, P: serde::de::DeserializeOwned + serde::Serialize, PU, S>
424    MemorixPubSubItem<K, P, PU, S>
425{
426    pub fn new(
427        memorix_base: MemorixBase,
428        id: String,
429    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
430        Ok(Self {
431            memorix_base,
432            id,
433            has_key: true,
434            _marker: std::marker::PhantomData,
435        })
436    }
437    fn new_no_key(
438        memorix_base: MemorixBase,
439        id: String,
440    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
441        Ok(Self {
442            memorix_base,
443            id,
444            has_key: false,
445            _marker: std::marker::PhantomData,
446        })
447    }
448    pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
449        let prefix = match self.memorix_base.namespace_name_tree.len() {
450            0 => "".to_string(),
451            _ => format!(
452                "{},",
453                self.memorix_base
454                    .namespace_name_tree
455                    .iter()
456                    .map(|x| format!("\"{}\"", x))
457                    .collect::<Vec<_>>()
458                    .join(",")
459            ),
460        };
461        Ok(match self.has_key {
462            false => format!("[{}\"{}\"]", prefix, self.id),
463            true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
464        })
465    }
466}
467
468impl<
469        K: serde::Serialize,
470        P: serde::de::DeserializeOwned + serde::Serialize,
471        PU: CanPubSubPublish,
472        S,
473    > MemorixPubSubItem<K, P, PU, S>
474{
475    pub async fn publish(
476        &mut self,
477        key: &K,
478        payload: &P,
479    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
480        let payload_str = serde_json::to_string(&payload)?;
481        let _: RedisValue = self
482            .memorix_base
483            .redis
484            .publish(self.key(key)?, payload_str)
485            .await?;
486        Ok(())
487    }
488}
489
490impl<
491        K: serde::Serialize,
492        P: serde::de::DeserializeOwned + serde::Serialize,
493        PU,
494        S: CanPubSubSubscribe,
495    > MemorixPubSubItem<K, P, PU, S>
496{
497    pub async fn subscribe(
498        &self,
499        key: &K,
500    ) -> Result<
501        core::pin::Pin<
502            Box<
503                dyn futures_core::stream::Stream<
504                        Item = Result<P, Box<dyn std::error::Error + Sync + Send>>,
505                    > + std::marker::Send,
506            >,
507        >,
508        Box<dyn std::error::Error + Sync + Send>,
509    > {
510        let mut pubsub = self.memorix_base.client.get_async_pubsub().await?;
511        pubsub.subscribe(self.key(key)?).await?;
512        let stream = pubsub
513            .into_on_message()
514            .map(|m| {
515                let payload = m.get_payload::<String>()?;
516                let parsed = serde_json::from_str::<P>(&payload)?;
517                Ok(parsed)
518            })
519            .boxed();
520        Ok(stream)
521    }
522}
523
524pub struct MemorixPubSubItemNoKey<P, PU, S> {
525    base_item: MemorixPubSubItem<std::marker::PhantomData<u8>, P, PU, S>,
526}
527impl<P, PU, S> Clone for MemorixPubSubItemNoKey<P, PU, S> {
528    fn clone(&self) -> Self {
529        Self {
530            base_item: self.base_item.clone(),
531        }
532    }
533}
534
535impl<P: serde::de::DeserializeOwned + serde::Serialize, PU, S> MemorixPubSubItemNoKey<P, PU, S> {
536    pub fn new(
537        memorix_base: MemorixBase,
538        id: String,
539    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
540        Ok(Self {
541            base_item: MemorixPubSubItem::new_no_key(memorix_base, id)?,
542        })
543    }
544}
545impl<P: serde::de::DeserializeOwned + serde::Serialize, PU: CanPubSubPublish, S>
546    MemorixPubSubItemNoKey<P, PU, S>
547{
548    pub async fn publish(
549        &mut self,
550        payload: &P,
551    ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
552        self.base_item
553            .publish(&std::marker::PhantomData, payload)
554            .await
555    }
556}
557impl<P: serde::de::DeserializeOwned + serde::Serialize, PU, S: CanPubSubSubscribe>
558    MemorixPubSubItemNoKey<P, PU, S>
559{
560    pub async fn subscribe(
561        &mut self,
562    ) -> Result<
563        core::pin::Pin<
564            Box<
565                dyn futures_core::stream::Stream<
566                        Item = Result<P, Box<dyn std::error::Error + Sync + Send>>,
567                    > + std::marker::Send,
568            >,
569        >,
570        Box<dyn std::error::Error + Sync + Send>,
571    > {
572        self.base_item.subscribe(&std::marker::PhantomData).await
573    }
574}
575
576pub trait CanTaskEnqueue {}
577pub trait CanTaskDequeue {}
578pub trait CanTaskEmpty {}
579pub trait CanTaskGetLen {}
580
581impl CanTaskEnqueue for Expose {}
582impl CanTaskDequeue for Expose {}
583impl CanTaskEmpty for Expose {}
584impl CanTaskGetLen for Expose {}
585
586pub struct MemorixTaskItem<K, P, E, D, EM, G> {
587    memorix_base: MemorixBase,
588    id: String,
589    has_key: bool,
590    options: MemorixTaskOptions,
591    _marker: std::marker::PhantomData<(K, P, E, D, EM, G)>,
592}
593impl<K, P, E, D, EM, G> Clone for MemorixTaskItem<K, P, E, D, EM, G> {
594    fn clone(&self) -> Self {
595        Self {
596            memorix_base: self.memorix_base.clone(),
597            id: self.id.clone(),
598            has_key: self.has_key,
599            options: self.options.clone(),
600            _marker: self._marker,
601        }
602    }
603}
604
605impl<K: serde::Serialize, P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G>
606    MemorixTaskItem<K, P, E, D, EM, G>
607{
608    pub fn new(
609        memorix_base: MemorixBase,
610        id: String,
611        options: Option<MemorixTaskOptions>,
612    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
613        Ok(Self {
614            memorix_base: memorix_base.clone(),
615            id: id.clone(),
616            has_key: true,
617            options: options.unwrap_or_default(),
618            _marker: std::marker::PhantomData,
619        })
620    }
621    fn new_no_key(
622        memorix_base: MemorixBase,
623        id: String,
624        options: Option<MemorixTaskOptions>,
625    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
626        Ok(Self {
627            memorix_base: memorix_base.clone(),
628            id: id.clone(),
629            has_key: false,
630            options: options.unwrap_or_default(),
631            _marker: std::marker::PhantomData,
632        })
633    }
634    pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
635        let prefix = match self.memorix_base.namespace_name_tree.len() {
636            0 => "".to_string(),
637            _ => format!(
638                "{},",
639                self.memorix_base
640                    .namespace_name_tree
641                    .iter()
642                    .map(|x| format!("\"{}\"", x))
643                    .collect::<Vec<_>>()
644                    .join(",")
645            ),
646        };
647        Ok(match self.has_key {
648            false => format!("[{}\"{}\"]", prefix, self.id),
649            true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
650        })
651    }
652}
653
654pub struct MemorixTaskItemEnqueue {
655    pub queue_size: usize,
656}
657
658impl<
659        K: serde::Serialize,
660        P: serde::Serialize + serde::de::DeserializeOwned,
661        E: CanTaskEnqueue,
662        D,
663        EM,
664        G,
665    > MemorixTaskItem<K, P, E, D, EM, G>
666{
667    pub async fn enqueue(
668        &mut self,
669        key: &K,
670        payload: &P,
671    ) -> Result<MemorixTaskItemEnqueue, Box<dyn std::error::Error + Sync + Send>> {
672        let queue_size = self
673            .memorix_base
674            .redis
675            .rpush(self.key(key)?, serde_json::to_string(&payload)?)
676            .await?;
677        Ok(MemorixTaskItemEnqueue { queue_size })
678    }
679}
680
681impl<
682        K: serde::Serialize,
683        P: serde::Serialize + serde::de::DeserializeOwned,
684        E,
685        D: CanTaskDequeue,
686        EM,
687        G,
688    > MemorixTaskItem<K, P, E, D, EM, G>
689{
690    pub async fn dequeue(
691        &self,
692        key: &K,
693    ) -> Result<
694        impl futures_core::Stream<Item = Result<P, Box<dyn std::error::Error + Sync + Send>>> + '_,
695        Box<dyn std::error::Error + Sync + Send>,
696    > {
697        let key_str = self.key(key)?;
698
699        let mut redis = self
700            .memorix_base
701            .client
702            .get_multiplexed_async_connection()
703            .await?;
704
705        Ok(Box::pin(async_stream::stream! {
706            loop {
707                let (_, payload): (RedisValue, String) = (match self.options.get_queue_type()? {
708                    QueueType::Fifo => redis.blpop(key_str.to_string(), 0.0),
709                    QueueType::Lifo => redis.brpop(key_str.to_string(), 0.0),
710                })
711                .await
712                .unwrap();
713                let payload = serde_json::from_str::<'_, P>(payload.as_str())?;
714                yield Ok(payload)
715            }
716        }))
717    }
718}
719
720impl<
721        K: serde::Serialize,
722        P: serde::Serialize + serde::de::DeserializeOwned,
723        E,
724        D,
725        EM: CanTaskEmpty,
726        G,
727    > MemorixTaskItem<K, P, E, D, EM, G>
728{
729    pub async fn empty(&mut self, key: &K) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
730        let _: RedisValue = self.memorix_base.redis.del(self.key(key)?).await?;
731        Ok(())
732    }
733}
734
735impl<
736        K: serde::Serialize,
737        P: serde::Serialize + serde::de::DeserializeOwned,
738        E,
739        D,
740        EM,
741        G: CanTaskGetLen,
742    > MemorixTaskItem<K, P, E, D, EM, G>
743{
744    pub async fn get_len(
745        &mut self,
746        key: &K,
747    ) -> Result<usize, Box<dyn std::error::Error + Sync + Send>> {
748        let queue_size = self.memorix_base.redis.llen(self.key(key)?).await?;
749        Ok(queue_size)
750    }
751}
752
753pub struct MemorixTaskItemNoKey<P, E, D, EM, G> {
754    base_item: MemorixTaskItem<std::marker::PhantomData<u8>, P, E, D, EM, G>,
755}
756impl<P, E, D, EM, G> Clone for MemorixTaskItemNoKey<P, E, D, EM, G> {
757    fn clone(&self) -> Self {
758        Self {
759            base_item: self.base_item.clone(),
760        }
761    }
762}
763
764impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G>
765    MemorixTaskItemNoKey<P, E, D, EM, G>
766{
767    pub fn new(
768        memorix_base: MemorixBase,
769        id: String,
770        options: Option<MemorixTaskOptions>,
771    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
772        Ok(Self {
773            base_item: MemorixTaskItem::new_no_key(memorix_base, id, options)?,
774        })
775    }
776}
777impl<P: serde::Serialize + serde::de::DeserializeOwned, E: CanTaskEnqueue, D, EM, G>
778    MemorixTaskItemNoKey<P, E, D, EM, G>
779{
780    pub async fn enqueue(
781        &mut self,
782        payload: &P,
783    ) -> Result<MemorixTaskItemEnqueue, Box<dyn std::error::Error + Sync + Send>> {
784        self.base_item
785            .enqueue(&std::marker::PhantomData, payload)
786            .await
787    }
788}
789impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D: CanTaskDequeue, EM, G>
790    MemorixTaskItemNoKey<P, E, D, EM, G>
791{
792    pub async fn dequeue(
793        &self,
794    ) -> Result<
795        impl futures_core::Stream<Item = Result<P, Box<dyn std::error::Error + Sync + Send>>> + '_,
796        Box<dyn std::error::Error + Sync + Send>,
797    > {
798        self.base_item.dequeue(&std::marker::PhantomData).await
799    }
800}
801impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM: CanTaskEmpty, G>
802    MemorixTaskItemNoKey<P, E, D, EM, G>
803{
804    pub async fn empty(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
805        self.base_item.empty(&std::marker::PhantomData).await
806    }
807}
808impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G: CanTaskGetLen>
809    MemorixTaskItemNoKey<P, E, D, EM, G>
810{
811    pub async fn get_len(&mut self) -> Result<usize, Box<dyn std::error::Error + Sync + Send>> {
812        self.base_item.get_len(&std::marker::PhantomData).await
813    }
814}