drasi_core/index_cache/
shadowed_future_queue.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use tokio::sync::RwLock;
19
20use crate::{
21    interface::{FutureElementRef, FutureQueue, IndexError, PushType},
22    models::{ElementReference, ElementTimestamp},
23};
24
25enum HeadItemShadow {
26    Known(Option<ElementTimestamp>),
27    Unknown,
28}
29
30pub struct ShadowedFutureQueue {
31    inner: Arc<dyn FutureQueue>,
32    head_shadow: RwLock<HeadItemShadow>,
33}
34
35impl ShadowedFutureQueue {
36    pub fn new(inner: Arc<dyn FutureQueue>) -> Self {
37        Self {
38            inner,
39            head_shadow: RwLock::new(HeadItemShadow::Unknown),
40        }
41    }
42}
43
44#[async_trait]
45impl FutureQueue for ShadowedFutureQueue {
46    async fn push(
47        &self,
48        push_type: PushType,
49        position_in_query: usize,
50        group_signature: u64,
51        element_ref: &ElementReference,
52        original_time: ElementTimestamp,
53        due_time: ElementTimestamp,
54    ) -> Result<bool, IndexError> {
55        let result = self
56            .inner
57            .push(
58                push_type,
59                position_in_query,
60                group_signature,
61                element_ref,
62                original_time,
63                due_time,
64            )
65            .await?;
66
67        if push_type == PushType::Overwrite {
68            let mut head_shadow = self.head_shadow.write().await;
69            *head_shadow = HeadItemShadow::Unknown;
70            return Ok(result);
71        }
72
73        if result {
74            let mut head_shadow = self.head_shadow.write().await;
75
76            match *head_shadow {
77                HeadItemShadow::Known(Some(head_due_time)) => {
78                    if due_time < head_due_time {
79                        *head_shadow = HeadItemShadow::Known(Some(due_time));
80                    }
81                }
82                HeadItemShadow::Known(None) => {
83                    *head_shadow = HeadItemShadow::Known(Some(due_time));
84                }
85                HeadItemShadow::Unknown => {}
86            }
87        }
88
89        Ok(result)
90    }
91
92    async fn remove(
93        &self,
94        position_in_query: usize,
95        group_signature: u64,
96    ) -> Result<(), IndexError> {
97        let mut shadow = self.head_shadow.write().await;
98        *shadow = HeadItemShadow::Unknown;
99        self.inner.remove(position_in_query, group_signature).await
100    }
101
102    async fn pop(&self) -> Result<Option<FutureElementRef>, IndexError> {
103        let mut shadow = self.head_shadow.write().await;
104        *shadow = HeadItemShadow::Unknown;
105        self.inner.pop().await
106    }
107
108    async fn peek_due_time(&self) -> Result<Option<ElementTimestamp>, IndexError> {
109        let mut shadow = self.head_shadow.write().await;
110        match *shadow {
111            HeadItemShadow::Known(Some(head_due_time)) => Ok(Some(head_due_time)),
112            HeadItemShadow::Known(None) => Ok(None),
113            HeadItemShadow::Unknown => {
114                let result = self.inner.peek_due_time().await?;
115                *shadow = HeadItemShadow::Known(result);
116                Ok(result)
117            }
118        }
119    }
120
121    async fn clear(&self) -> Result<(), IndexError> {
122        let mut shadow = self.head_shadow.write().await;
123        *shadow = HeadItemShadow::Unknown;
124        self.inner.clear().await
125    }
126}