1extern crate futures_core;
2extern crate futures_util;
3extern crate proc_macro;
4extern crate redis;
5extern crate serde;
6extern crate serde_json;
7extern crate uuid;
8
9mod utils;
10
11use redis::AsyncCommands;
12
13pub use futures_util::StreamExt;
14pub use memorix_client_redis_macros::serialization;
15use redis::Value as RedisValue;
16
17#[doc(hidden)]
18pub mod __private {
19 pub extern crate serde;
20}
21
22pub struct Expose;
23pub struct Hide;
24
25#[derive(Clone)]
26pub enum Value {
27 String {
28 value: &'static str,
29 },
30 EnvVariable {
31 name: &'static str,
32 value: Option<String>,
33 },
34}
35
36impl Value {
37 pub fn from_string(value: &'static str) -> Self {
38 Self::String { value }
39 }
40 pub fn from_env_variable(name: &'static str) -> Self {
41 let value = std::env::var(name).ok();
42 Self::EnvVariable { name, value }
43 }
44 fn require(&self) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
45 match self {
46 Self::String { value } => Ok((*value).to_string()),
47 Self::EnvVariable { name, value } => Ok(value
48 .clone()
49 .ok_or(format!("Environment variable \"{name}\" is not set"))?),
50 }
51 }
52}
53
54#[derive(Clone, Default)]
55pub struct MemorixCacheOptions {
56 pub ttl_ms: Option<Value>,
57 pub extend_on_get: Option<Value>,
58}
59#[derive(Clone)]
60pub struct MemorixCacheOptionsInner {
61 pub ttl_ms: usize,
62 pub extend_on_get: bool,
63}
64
65impl MemorixCacheOptions {
66 fn get_ttl_ms(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
67 Ok(match self.ttl_ms.as_ref() {
68 Some(x) => x.require()?.parse()?,
69 None => 0,
70 })
71 }
72}
73impl MemorixCacheOptions {
74 fn get_extend_on_get(&self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
75 Ok(match self.extend_on_get.as_ref() {
76 Some(x) => x.require()?.parse()?,
77 None => false,
78 })
79 }
80}
81
82#[derive(Clone, Default)]
83pub struct MemorixTaskOptions {
84 pub queue_type: Option<Value>,
85}
86
87#[derive(Clone)]
88pub enum QueueType {
89 Fifo,
90 Lifo,
91}
92impl MemorixTaskOptions {
93 fn get_queue_type(&self) -> Result<QueueType, Box<dyn std::error::Error + Send + Sync>> {
94 Ok(match &self.queue_type {
95 Some(x) => {
96 let value_str = x.require()?;
97 match value_str.as_str() {
98 "lifo" => QueueType::Lifo,
99 "fifo" => QueueType::Fifo,
100 _ => {
101 return Err(format!(
102 "no valid option for \"queue_type\", given \"{value_str}\""
103 )
104 .into())
105 }
106 }
107 }
108 None => QueueType::Fifo,
109 })
110 }
111}
112
113#[derive(Clone)]
114pub struct MemorixBase {
115 client: redis::Client,
116 redis: redis::aio::MultiplexedConnection,
117 task_redis: redis::aio::MultiplexedConnection,
118 namespace_name_tree: &'static [&'static str],
119}
120
121impl MemorixBase {
122 pub async fn new(
123 redis_url: &Value,
124 namespace_name_tree: &'static [&'static str],
125 ) -> Result<MemorixBase, Box<dyn std::error::Error + Sync + Send>> {
126 let client = redis::Client::open(redis_url.require()?)?;
127 let redis = client.get_multiplexed_async_connection().await?;
128 let task_redis = client.get_multiplexed_async_connection().await?;
129 Ok(Self {
130 client,
131 redis,
132 task_redis,
133 namespace_name_tree,
134 })
135 }
136 pub fn from(other: Self, namespace_name_tree: &'static [&'static str]) -> Self {
137 Self {
138 client: other.client,
139 redis: other.redis,
140 task_redis: other.task_redis,
141 namespace_name_tree,
142 }
143 }
144}
145
146pub trait CanCacheGet {}
147pub trait CanCacheSet {}
148pub trait CanCacheDelete {}
149pub trait CanCacheExpire {}
150
151impl CanCacheGet for Expose {}
152impl CanCacheSet for Expose {}
153impl CanCacheDelete for Expose {}
154impl CanCacheExpire for Expose {}
155
156pub struct MemorixCacheItem<K, P, G, S, D, E> {
157 memorix_base: MemorixBase,
158 id: String,
159 has_key: bool,
160 options: MemorixCacheOptions,
161 _marker: std::marker::PhantomData<(K, P, G, S, D, E)>,
162}
163impl<K, P, G, S, D, E> Clone for MemorixCacheItem<K, P, G, S, D, E> {
164 fn clone(&self) -> Self {
165 Self {
166 memorix_base: self.memorix_base.clone(),
167 has_key: self.has_key,
168 id: self.id.clone(),
169 options: self.options.clone(),
170 _marker: self._marker,
171 }
172 }
173}
174
175impl<K: serde::Serialize, P: serde::Serialize + serde::de::DeserializeOwned, G, S, D, E>
176 MemorixCacheItem<K, P, G, S, D, E>
177{
178 pub fn new(
179 memorix_base: MemorixBase,
180 id: String,
181 options: Option<MemorixCacheOptions>,
182 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
183 Ok(Self {
184 memorix_base,
185 id,
186 has_key: true,
187 options: options.unwrap_or_default(),
188 _marker: std::marker::PhantomData,
189 })
190 }
191 fn new_no_key(
192 memorix_base: MemorixBase,
193 id: String,
194 options: Option<MemorixCacheOptions>,
195 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
196 Ok(Self {
197 memorix_base,
198 id,
199 has_key: false,
200 options: options.unwrap_or_default(),
201 _marker: std::marker::PhantomData,
202 })
203 }
204 pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
205 let prefix = match self.memorix_base.namespace_name_tree.len() {
206 0 => "".to_string(),
207 _ => format!(
208 "{},",
209 self.memorix_base
210 .namespace_name_tree
211 .iter()
212 .map(|x| format!("\"{}\"", x))
213 .collect::<Vec<_>>()
214 .join(",")
215 ),
216 };
217 Ok(match self.has_key {
218 false => format!("[{}\"{}\"]", prefix, self.id),
219 true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
220 })
221 }
222 pub async fn extend(
223 &mut self,
224 key: &K,
225 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
226 let hashed_key = self.key(key)?;
227 let ttl_ms = match self.options.get_ttl_ms()? {
228 0 => return Err("Called extend with no ttl_ms".into()),
229 x => x,
230 };
231 let _: RedisValue = self
232 .memorix_base
233 .redis
234 .pexpire(hashed_key, ttl_ms.try_into()?)
235 .await?;
236 Ok(())
237 }
238}
239
240impl<
241 K: serde::Serialize,
242 P: serde::Serialize + serde::de::DeserializeOwned,
243 G: CanCacheGet,
244 S,
245 D,
246 E,
247 > MemorixCacheItem<K, P, G, S, D, E>
248{
249 pub async fn get(
250 &mut self,
251 key: &K,
252 ) -> Result<Option<P>, Box<dyn std::error::Error + Sync + Send>> {
253 let payload_str: Option<String> = self.memorix_base.redis.get(self.key(key)?).await?;
254
255 let payload_str = match payload_str {
256 Some(x) => x,
257 None => {
258 return Ok(None);
259 }
260 };
261
262 let payload: P = serde_json::from_str(&payload_str)?;
263
264 if self.options.get_extend_on_get()? {
265 self.extend(key).await?;
266 }
267
268 Ok(Some(payload))
269 }
270}
271
272impl<
273 K: serde::Serialize,
274 P: serde::Serialize + serde::de::DeserializeOwned,
275 G,
276 S: CanCacheSet,
277 D,
278 E,
279 > MemorixCacheItem<K, P, G, S, D, E>
280{
281 pub async fn set(
282 &mut self,
283 key: &K,
284 payload: &P,
285 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
286 let payload_str = serde_json::to_string(&payload)?;
287 let _: RedisValue = self
288 .memorix_base
289 .redis
290 .set_options(
291 self.key(key)?,
292 payload_str,
293 match self.options.get_ttl_ms()? {
294 0 => redis::SetOptions::default(),
295 ttl_ms => redis::SetOptions::default()
296 .with_expiration(redis::SetExpiry::PX(ttl_ms.try_into()?)),
297 },
298 )
299 .await?;
300
301 Ok(())
302 }
303}
304
305impl<
306 K: serde::Serialize,
307 P: serde::Serialize + serde::de::DeserializeOwned,
308 G,
309 S,
310 D: CanCacheDelete,
311 E,
312 > MemorixCacheItem<K, P, G, S, D, E>
313{
314 pub async fn delete(
315 &mut self,
316 key: &K,
317 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
318 let _: RedisValue = self.memorix_base.redis.del(self.key(key)?).await?;
319
320 Ok(())
321 }
322}
323
324impl<
325 K: serde::Serialize,
326 P: serde::Serialize + serde::de::DeserializeOwned,
327 G,
328 S,
329 D,
330 E: CanCacheExpire,
331 > MemorixCacheItem<K, P, G, S, D, E>
332{
333 pub async fn expire(
334 &mut self,
335 key: &K,
336 ttl_ms: usize,
337 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
338 let _: RedisValue = self
339 .memorix_base
340 .redis
341 .pexpire(self.key(key)?, ttl_ms.try_into()?)
342 .await?;
343
344 Ok(())
345 }
346}
347
348pub struct MemorixCacheItemNoKey<P, G, S, D, E> {
349 base_item:
350 MemorixCacheItem<std::marker::PhantomData<std::marker::PhantomData<u8>>, P, G, S, D, E>,
351}
352impl<P, G, S, D, E> Clone for MemorixCacheItemNoKey<P, G, S, D, E> {
353 fn clone(&self) -> Self {
354 Self {
355 base_item: self.base_item.clone(),
356 }
357 }
358}
359
360impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S, D, E>
361 MemorixCacheItemNoKey<P, G, S, D, E>
362{
363 pub fn new(
364 memorix_base: MemorixBase,
365 id: String,
366 options: Option<MemorixCacheOptions>,
367 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
368 Ok(Self {
369 base_item: MemorixCacheItem::new_no_key(memorix_base, id, options)?,
370 })
371 }
372}
373impl<P: serde::de::DeserializeOwned + serde::Serialize, G: CanCacheGet, S, D, E>
374 MemorixCacheItemNoKey<P, G, S, D, E>
375{
376 pub async fn get(&mut self) -> Result<Option<P>, Box<dyn std::error::Error + Sync + Send>> {
377 self.base_item.get(&std::marker::PhantomData).await
378 }
379}
380impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S: CanCacheSet, D, E>
381 MemorixCacheItemNoKey<P, G, S, D, E>
382{
383 pub async fn set(
384 &mut self,
385 payload: &P,
386 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
387 self.base_item.set(&std::marker::PhantomData, payload).await
388 }
389}
390impl<P: serde::de::DeserializeOwned + serde::Serialize, G, S, D: CanCacheDelete, E>
391 MemorixCacheItemNoKey<P, G, S, D, E>
392{
393 pub async fn delete(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
394 self.base_item.delete(&std::marker::PhantomData).await
395 }
396}
397
398pub trait CanPubSubPublish {}
399pub trait CanPubSubSubscribe {}
400
401impl CanPubSubPublish for Expose {}
402impl CanPubSubSubscribe for Expose {}
403
404pub struct MemorixPubSubItem<K, P, PU, S> {
406 memorix_base: MemorixBase,
407 id: String,
408 has_key: bool,
409 _marker: std::marker::PhantomData<(K, P, PU, S)>,
410}
411
412impl<K, P, PU, S> Clone for MemorixPubSubItem<K, P, PU, S> {
413 fn clone(&self) -> Self {
414 Self {
415 memorix_base: self.memorix_base.clone(),
416 has_key: self.has_key,
417 id: self.id.clone(),
418 _marker: self._marker,
419 }
420 }
421}
422
423impl<K: serde::Serialize, P: serde::de::DeserializeOwned + serde::Serialize, PU, S>
424 MemorixPubSubItem<K, P, PU, S>
425{
426 pub fn new(
427 memorix_base: MemorixBase,
428 id: String,
429 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
430 Ok(Self {
431 memorix_base,
432 id,
433 has_key: true,
434 _marker: std::marker::PhantomData,
435 })
436 }
437 fn new_no_key(
438 memorix_base: MemorixBase,
439 id: String,
440 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
441 Ok(Self {
442 memorix_base,
443 id,
444 has_key: false,
445 _marker: std::marker::PhantomData,
446 })
447 }
448 pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
449 let prefix = match self.memorix_base.namespace_name_tree.len() {
450 0 => "".to_string(),
451 _ => format!(
452 "{},",
453 self.memorix_base
454 .namespace_name_tree
455 .iter()
456 .map(|x| format!("\"{}\"", x))
457 .collect::<Vec<_>>()
458 .join(",")
459 ),
460 };
461 Ok(match self.has_key {
462 false => format!("[{}\"{}\"]", prefix, self.id),
463 true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
464 })
465 }
466}
467
468impl<
469 K: serde::Serialize,
470 P: serde::de::DeserializeOwned + serde::Serialize,
471 PU: CanPubSubPublish,
472 S,
473 > MemorixPubSubItem<K, P, PU, S>
474{
475 pub async fn publish(
476 &mut self,
477 key: &K,
478 payload: &P,
479 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
480 let payload_str = serde_json::to_string(&payload)?;
481 let _: RedisValue = self
482 .memorix_base
483 .redis
484 .publish(self.key(key)?, payload_str)
485 .await?;
486 Ok(())
487 }
488}
489
490impl<
491 K: serde::Serialize,
492 P: serde::de::DeserializeOwned + serde::Serialize,
493 PU,
494 S: CanPubSubSubscribe,
495 > MemorixPubSubItem<K, P, PU, S>
496{
497 pub async fn subscribe(
498 &self,
499 key: &K,
500 ) -> Result<
501 core::pin::Pin<
502 Box<
503 dyn futures_core::stream::Stream<
504 Item = Result<P, Box<dyn std::error::Error + Sync + Send>>,
505 > + std::marker::Send,
506 >,
507 >,
508 Box<dyn std::error::Error + Sync + Send>,
509 > {
510 let mut pubsub = self.memorix_base.client.get_async_pubsub().await?;
511 pubsub.subscribe(self.key(key)?).await?;
512 let stream = pubsub
513 .into_on_message()
514 .map(|m| {
515 let payload = m.get_payload::<String>()?;
516 let parsed = serde_json::from_str::<P>(&payload)?;
517 Ok(parsed)
518 })
519 .boxed();
520 Ok(stream)
521 }
522}
523
524pub struct MemorixPubSubItemNoKey<P, PU, S> {
525 base_item: MemorixPubSubItem<std::marker::PhantomData<u8>, P, PU, S>,
526}
527impl<P, PU, S> Clone for MemorixPubSubItemNoKey<P, PU, S> {
528 fn clone(&self) -> Self {
529 Self {
530 base_item: self.base_item.clone(),
531 }
532 }
533}
534
535impl<P: serde::de::DeserializeOwned + serde::Serialize, PU, S> MemorixPubSubItemNoKey<P, PU, S> {
536 pub fn new(
537 memorix_base: MemorixBase,
538 id: String,
539 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
540 Ok(Self {
541 base_item: MemorixPubSubItem::new_no_key(memorix_base, id)?,
542 })
543 }
544}
545impl<P: serde::de::DeserializeOwned + serde::Serialize, PU: CanPubSubPublish, S>
546 MemorixPubSubItemNoKey<P, PU, S>
547{
548 pub async fn publish(
549 &mut self,
550 payload: &P,
551 ) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
552 self.base_item
553 .publish(&std::marker::PhantomData, payload)
554 .await
555 }
556}
557impl<P: serde::de::DeserializeOwned + serde::Serialize, PU, S: CanPubSubSubscribe>
558 MemorixPubSubItemNoKey<P, PU, S>
559{
560 pub async fn subscribe(
561 &mut self,
562 ) -> Result<
563 core::pin::Pin<
564 Box<
565 dyn futures_core::stream::Stream<
566 Item = Result<P, Box<dyn std::error::Error + Sync + Send>>,
567 > + std::marker::Send,
568 >,
569 >,
570 Box<dyn std::error::Error + Sync + Send>,
571 > {
572 self.base_item.subscribe(&std::marker::PhantomData).await
573 }
574}
575
576pub trait CanTaskEnqueue {}
577pub trait CanTaskDequeue {}
578pub trait CanTaskEmpty {}
579pub trait CanTaskGetLen {}
580
581impl CanTaskEnqueue for Expose {}
582impl CanTaskDequeue for Expose {}
583impl CanTaskEmpty for Expose {}
584impl CanTaskGetLen for Expose {}
585
586pub struct MemorixTaskItem<K, P, E, D, EM, G> {
587 memorix_base: MemorixBase,
588 id: String,
589 has_key: bool,
590 options: MemorixTaskOptions,
591 _marker: std::marker::PhantomData<(K, P, E, D, EM, G)>,
592}
593impl<K, P, E, D, EM, G> Clone for MemorixTaskItem<K, P, E, D, EM, G> {
594 fn clone(&self) -> Self {
595 Self {
596 memorix_base: self.memorix_base.clone(),
597 id: self.id.clone(),
598 has_key: self.has_key,
599 options: self.options.clone(),
600 _marker: self._marker,
601 }
602 }
603}
604
605impl<K: serde::Serialize, P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G>
606 MemorixTaskItem<K, P, E, D, EM, G>
607{
608 pub fn new(
609 memorix_base: MemorixBase,
610 id: String,
611 options: Option<MemorixTaskOptions>,
612 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
613 Ok(Self {
614 memorix_base: memorix_base.clone(),
615 id: id.clone(),
616 has_key: true,
617 options: options.unwrap_or_default(),
618 _marker: std::marker::PhantomData,
619 })
620 }
621 fn new_no_key(
622 memorix_base: MemorixBase,
623 id: String,
624 options: Option<MemorixTaskOptions>,
625 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
626 Ok(Self {
627 memorix_base: memorix_base.clone(),
628 id: id.clone(),
629 has_key: false,
630 options: options.unwrap_or_default(),
631 _marker: std::marker::PhantomData,
632 })
633 }
634 pub fn key(&self, key: &K) -> Result<String, Box<dyn std::error::Error + Sync + Send>> {
635 let prefix = match self.memorix_base.namespace_name_tree.len() {
636 0 => "".to_string(),
637 _ => format!(
638 "{},",
639 self.memorix_base
640 .namespace_name_tree
641 .iter()
642 .map(|x| format!("\"{}\"", x))
643 .collect::<Vec<_>>()
644 .join(",")
645 ),
646 };
647 Ok(match self.has_key {
648 false => format!("[{}\"{}\"]", prefix, self.id),
649 true => format!("[{}\"{}\",{}]", prefix, self.id, utils::hash_key(&key)?),
650 })
651 }
652}
653
654pub struct MemorixTaskItemEnqueue {
655 pub queue_size: usize,
656}
657
658impl<
659 K: serde::Serialize,
660 P: serde::Serialize + serde::de::DeserializeOwned,
661 E: CanTaskEnqueue,
662 D,
663 EM,
664 G,
665 > MemorixTaskItem<K, P, E, D, EM, G>
666{
667 pub async fn enqueue(
668 &mut self,
669 key: &K,
670 payload: &P,
671 ) -> Result<MemorixTaskItemEnqueue, Box<dyn std::error::Error + Sync + Send>> {
672 let queue_size = self
673 .memorix_base
674 .redis
675 .rpush(self.key(key)?, serde_json::to_string(&payload)?)
676 .await?;
677 Ok(MemorixTaskItemEnqueue { queue_size })
678 }
679}
680
681impl<
682 K: serde::Serialize,
683 P: serde::Serialize + serde::de::DeserializeOwned,
684 E,
685 D: CanTaskDequeue,
686 EM,
687 G,
688 > MemorixTaskItem<K, P, E, D, EM, G>
689{
690 pub async fn dequeue(
691 &self,
692 key: &K,
693 ) -> Result<
694 impl futures_core::Stream<Item = Result<P, Box<dyn std::error::Error + Sync + Send>>> + '_,
695 Box<dyn std::error::Error + Sync + Send>,
696 > {
697 let key_str = self.key(key)?;
698
699 let mut redis = self
700 .memorix_base
701 .client
702 .get_multiplexed_async_connection()
703 .await?;
704
705 Ok(Box::pin(async_stream::stream! {
706 loop {
707 let (_, payload): (RedisValue, String) = (match self.options.get_queue_type()? {
708 QueueType::Fifo => redis.blpop(key_str.to_string(), 0.0),
709 QueueType::Lifo => redis.brpop(key_str.to_string(), 0.0),
710 })
711 .await
712 .unwrap();
713 let payload = serde_json::from_str::<'_, P>(payload.as_str())?;
714 yield Ok(payload)
715 }
716 }))
717 }
718}
719
720impl<
721 K: serde::Serialize,
722 P: serde::Serialize + serde::de::DeserializeOwned,
723 E,
724 D,
725 EM: CanTaskEmpty,
726 G,
727 > MemorixTaskItem<K, P, E, D, EM, G>
728{
729 pub async fn empty(&mut self, key: &K) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
730 let _: RedisValue = self.memorix_base.redis.del(self.key(key)?).await?;
731 Ok(())
732 }
733}
734
735impl<
736 K: serde::Serialize,
737 P: serde::Serialize + serde::de::DeserializeOwned,
738 E,
739 D,
740 EM,
741 G: CanTaskGetLen,
742 > MemorixTaskItem<K, P, E, D, EM, G>
743{
744 pub async fn get_len(
745 &mut self,
746 key: &K,
747 ) -> Result<usize, Box<dyn std::error::Error + Sync + Send>> {
748 let queue_size = self.memorix_base.redis.llen(self.key(key)?).await?;
749 Ok(queue_size)
750 }
751}
752
753pub struct MemorixTaskItemNoKey<P, E, D, EM, G> {
754 base_item: MemorixTaskItem<std::marker::PhantomData<u8>, P, E, D, EM, G>,
755}
756impl<P, E, D, EM, G> Clone for MemorixTaskItemNoKey<P, E, D, EM, G> {
757 fn clone(&self) -> Self {
758 Self {
759 base_item: self.base_item.clone(),
760 }
761 }
762}
763
764impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G>
765 MemorixTaskItemNoKey<P, E, D, EM, G>
766{
767 pub fn new(
768 memorix_base: MemorixBase,
769 id: String,
770 options: Option<MemorixTaskOptions>,
771 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
772 Ok(Self {
773 base_item: MemorixTaskItem::new_no_key(memorix_base, id, options)?,
774 })
775 }
776}
777impl<P: serde::Serialize + serde::de::DeserializeOwned, E: CanTaskEnqueue, D, EM, G>
778 MemorixTaskItemNoKey<P, E, D, EM, G>
779{
780 pub async fn enqueue(
781 &mut self,
782 payload: &P,
783 ) -> Result<MemorixTaskItemEnqueue, Box<dyn std::error::Error + Sync + Send>> {
784 self.base_item
785 .enqueue(&std::marker::PhantomData, payload)
786 .await
787 }
788}
789impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D: CanTaskDequeue, EM, G>
790 MemorixTaskItemNoKey<P, E, D, EM, G>
791{
792 pub async fn dequeue(
793 &self,
794 ) -> Result<
795 impl futures_core::Stream<Item = Result<P, Box<dyn std::error::Error + Sync + Send>>> + '_,
796 Box<dyn std::error::Error + Sync + Send>,
797 > {
798 self.base_item.dequeue(&std::marker::PhantomData).await
799 }
800}
801impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM: CanTaskEmpty, G>
802 MemorixTaskItemNoKey<P, E, D, EM, G>
803{
804 pub async fn empty(&mut self) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
805 self.base_item.empty(&std::marker::PhantomData).await
806 }
807}
808impl<P: serde::Serialize + serde::de::DeserializeOwned, E, D, EM, G: CanTaskGetLen>
809 MemorixTaskItemNoKey<P, E, D, EM, G>
810{
811 pub async fn get_len(&mut self) -> Result<usize, Box<dyn std::error::Error + Sync + Send>> {
812 self.base_item.get_len(&std::marker::PhantomData).await
813 }
814}