drasi_core/index_cache/
shadowed_future_queue.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use tokio::sync::RwLock;
19
20use crate::{
21 interface::{FutureElementRef, FutureQueue, IndexError, PushType},
22 models::{ElementReference, ElementTimestamp},
23};
24
25enum HeadItemShadow {
26 Known(Option<ElementTimestamp>),
27 Unknown,
28}
29
30pub struct ShadowedFutureQueue {
31 inner: Arc<dyn FutureQueue>,
32 head_shadow: RwLock<HeadItemShadow>,
33}
34
35impl ShadowedFutureQueue {
36 pub fn new(inner: Arc<dyn FutureQueue>) -> Self {
37 Self {
38 inner,
39 head_shadow: RwLock::new(HeadItemShadow::Unknown),
40 }
41 }
42}
43
44#[async_trait]
45impl FutureQueue for ShadowedFutureQueue {
46 async fn push(
47 &self,
48 push_type: PushType,
49 position_in_query: usize,
50 group_signature: u64,
51 element_ref: &ElementReference,
52 original_time: ElementTimestamp,
53 due_time: ElementTimestamp,
54 ) -> Result<bool, IndexError> {
55 let result = self
56 .inner
57 .push(
58 push_type,
59 position_in_query,
60 group_signature,
61 element_ref,
62 original_time,
63 due_time,
64 )
65 .await?;
66
67 if push_type == PushType::Overwrite {
68 let mut head_shadow = self.head_shadow.write().await;
69 *head_shadow = HeadItemShadow::Unknown;
70 return Ok(result);
71 }
72
73 if result {
74 let mut head_shadow = self.head_shadow.write().await;
75
76 match *head_shadow {
77 HeadItemShadow::Known(Some(head_due_time)) => {
78 if due_time < head_due_time {
79 *head_shadow = HeadItemShadow::Known(Some(due_time));
80 }
81 }
82 HeadItemShadow::Known(None) => {
83 *head_shadow = HeadItemShadow::Known(Some(due_time));
84 }
85 HeadItemShadow::Unknown => {}
86 }
87 }
88
89 Ok(result)
90 }
91
92 async fn remove(
93 &self,
94 position_in_query: usize,
95 group_signature: u64,
96 ) -> Result<(), IndexError> {
97 let mut shadow = self.head_shadow.write().await;
98 *shadow = HeadItemShadow::Unknown;
99 self.inner.remove(position_in_query, group_signature).await
100 }
101
102 async fn pop(&self) -> Result<Option<FutureElementRef>, IndexError> {
103 let mut shadow = self.head_shadow.write().await;
104 *shadow = HeadItemShadow::Unknown;
105 self.inner.pop().await
106 }
107
108 async fn peek_due_time(&self) -> Result<Option<ElementTimestamp>, IndexError> {
109 let mut shadow = self.head_shadow.write().await;
110 match *shadow {
111 HeadItemShadow::Known(Some(head_due_time)) => Ok(Some(head_due_time)),
112 HeadItemShadow::Known(None) => Ok(None),
113 HeadItemShadow::Unknown => {
114 let result = self.inner.peek_due_time().await?;
115 *shadow = HeadItemShadow::Known(result);
116 Ok(result)
117 }
118 }
119 }
120
121 async fn clear(&self) -> Result<(), IndexError> {
122 let mut shadow = self.head_shadow.write().await;
123 *shadow = HeadItemShadow::Unknown;
124 self.inner.clear().await
125 }
126}