Skip to main content

drasi_index_rocksdb/
future_queue.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    hash::{Hash, Hasher},
17    sync::Arc,
18};
19
20use async_trait::async_trait;
21use drasi_core::{
22    interface::{FutureElementRef, FutureQueue, IndexError, PushType},
23    models::{ElementReference, ElementTimestamp},
24};
25use hashers::jenkins::spooky_hash::SpookyHasher;
26use prost::{bytes::Bytes, Message};
27use rocksdb::{
28    AsColumnFamilyRef, OptimisticTransactionDB, Options, ReadOptions, SliceTransform, Transaction,
29};
30use tokio::task;
31
32use crate::storage_models::{StoredElementReference, StoredFutureElementRef};
33use crate::RocksDbSessionState;
34
35/// RocksDB future queue
36///
37/// Column Families
38///     - fqueue [due_time + hash(future_element_ref)] -> future_element_ref {8 byte prefix (due_time)}
39///     - findex [(position_in_query + group_signature) + hash(future_element_ref)] -> due_time {12 byte prefix (position_in_query(4) + group_signature(8))}
40pub struct RocksDbFutureQueue {
41    db: Arc<OptimisticTransactionDB>,
42    session_state: Arc<RocksDbSessionState>,
43}
44
45const QUEUE_CF: &str = "fqueue";
46const INDEX_CF: &str = "findex";
47
48impl RocksDbFutureQueue {
49    /// Create a new RocksDbFutureQueue from a shared database handle.
50    ///
51    /// The database must already have the required column families created.
52    /// Use `open_unified_db()` to open a database with all required CFs.
53    pub fn new(db: Arc<OptimisticTransactionDB>, session_state: Arc<RocksDbSessionState>) -> Self {
54        Self { db, session_state }
55    }
56}
57
58#[async_trait]
59impl FutureQueue for RocksDbFutureQueue {
60    async fn push(
61        &self,
62        push_type: PushType,
63        position_in_query: usize,
64        group_signature: u64,
65        element_ref: &ElementReference,
66        original_time: ElementTimestamp,
67        due_time: ElementTimestamp,
68    ) -> Result<bool, IndexError> {
69        let db = self.db.clone();
70        let session_state = self.session_state.clone();
71
72        let stored_element_ref: StoredElementReference = element_ref.into();
73
74        let task = task::spawn_blocking(move || {
75            let index_cf = db
76                .cf_handle(INDEX_CF)
77                .expect("findex Column family not found");
78            let queue_cf = db
79                .cf_handle(QUEUE_CF)
80                .expect("fqueue Column family not found");
81
82            let future_ref = StoredFutureElementRef {
83                element_ref: stored_element_ref,
84                original_time,
85                due_time,
86                position_in_query: position_in_query as u32,
87                group_signature,
88            };
89
90            let hash = {
91                let mut h = SpookyHasher::default();
92                let _ = &future_ref.hash(&mut h);
93                h.finish().to_be_bytes()
94            };
95
96            let prefix = encode_index_prefix(position_in_query as u32, group_signature);
97
98            session_state.with_txn(|txn| {
99                let should_push = match push_type {
100                    PushType::Always => true,
101                    PushType::IfNotExists => {
102                        let mut iter = txn.prefix_iterator_cf(&index_cf, prefix);
103                        match iter.next() {
104                            Some(item) => match item {
105                                Ok((key, _)) => key[0..12] != prefix,
106                                Err(e) => return Err(IndexError::other(e)),
107                            },
108                            None => true,
109                        }
110                    }
111                    PushType::Overwrite => {
112                        remove_internal(txn, &index_cf, &queue_cf, prefix)?;
113                        true
114                    }
115                };
116
117                if should_push {
118                    let index_key = {
119                        let mut buf = [0u8; 20];
120                        buf[0..12].copy_from_slice(&prefix);
121                        buf[12..20].copy_from_slice(&hash);
122                        buf
123                    };
124
125                    let queue_key = {
126                        let mut buf = [0u8; 16];
127                        buf[0..8].copy_from_slice(&due_time.to_be_bytes());
128                        buf[8..16].copy_from_slice(&hash);
129                        buf
130                    };
131
132                    txn.put_cf(&index_cf, index_key, due_time.to_be_bytes())
133                        .map_err(IndexError::other)?;
134                    txn.put_cf(&queue_cf, queue_key, future_ref.encode_to_vec())
135                        .map_err(IndexError::other)?;
136                }
137
138                Ok(should_push)
139            })
140        });
141
142        match task.await {
143            Ok(v) => v,
144            Err(e) => Err(IndexError::other(e)),
145        }
146    }
147
148    async fn remove(
149        &self,
150        position_in_query: usize,
151        group_signature: u64,
152    ) -> Result<(), IndexError> {
153        let db = self.db.clone();
154        let session_state = self.session_state.clone();
155
156        let task = task::spawn_blocking(move || {
157            let index_cf = db
158                .cf_handle(INDEX_CF)
159                .expect("findex column family not found");
160            let queue_cf = db
161                .cf_handle(QUEUE_CF)
162                .expect("fqueue column family not found");
163
164            let prefix = encode_index_prefix(position_in_query as u32, group_signature);
165
166            session_state.with_txn(|txn| remove_internal(txn, &index_cf, &queue_cf, prefix))
167        });
168
169        match task.await {
170            Ok(v) => v,
171            Err(e) => Err(IndexError::other(e)),
172        }
173    }
174
175    async fn pop(&self) -> Result<Option<FutureElementRef>, IndexError> {
176        let db = self.db.clone();
177        let session_state = self.session_state.clone();
178
179        let task = task::spawn_blocking(move || {
180            let index_cf = db
181                .cf_handle(INDEX_CF)
182                .expect("findex column family not found");
183            let queue_cf = db
184                .cf_handle(QUEUE_CF)
185                .expect("fqueue column family not found");
186
187            let read_opts = ReadOptions::default();
188
189            session_state.with_txn(|txn| {
190                let mut iter =
191                    txn.iterator_cf_opt(&queue_cf, read_opts, rocksdb::IteratorMode::Start);
192
193                let result = match iter.next() {
194                    Some(head) => match head {
195                        Ok((key, future_ref)) => {
196                            let hash = &key[8..];
197
198                            let stored_future_ref =
199                                match StoredFutureElementRef::decode(Bytes::from(future_ref)) {
200                                    Ok(v) => v,
201                                    Err(_) => return Err(IndexError::CorruptedData),
202                                };
203
204                            txn.delete_cf(&queue_cf, &key).map_err(IndexError::other)?;
205
206                            let prefix = encode_index_prefix(
207                                stored_future_ref.position_in_query,
208                                stored_future_ref.group_signature,
209                            );
210                            let index_key = {
211                                let mut buf = [0u8; 20];
212                                buf[0..12].copy_from_slice(&prefix);
213                                buf[12..20].copy_from_slice(hash);
214                                buf
215                            };
216
217                            txn.delete_cf(&index_cf, index_key)
218                                .map_err(IndexError::other)?;
219
220                            let future_ref: FutureElementRef = stored_future_ref.into();
221                            Some(future_ref)
222                        }
223                        Err(e) => return Err(IndexError::other(e)),
224                    },
225                    _ => None,
226                };
227
228                Ok(result)
229            })
230        });
231
232        match task.await {
233            Ok(v) => v,
234            Err(e) => Err(IndexError::other(e)),
235        }
236    }
237
238    async fn peek_due_time(&self) -> Result<Option<ElementTimestamp>, IndexError> {
239        let db = self.db.clone();
240
241        let task = task::spawn_blocking(move || {
242            let queue_cf = db
243                .cf_handle(QUEUE_CF)
244                .expect("fqueue Column family not found");
245
246            let read_opts = ReadOptions::default();
247            let mut iter = db.iterator_cf_opt(&queue_cf, read_opts, rocksdb::IteratorMode::Start);
248            parse_peek_head(iter.next())
249        });
250
251        match task.await {
252            Ok(v) => v,
253            Err(e) => Err(IndexError::other(e)),
254        }
255    }
256
257    async fn clear(&self) -> Result<(), IndexError> {
258        let db = self.db.clone();
259        let task = task::spawn_blocking(move || {
260            if let Err(err) = db.drop_cf(QUEUE_CF) {
261                return Err(IndexError::other(err));
262            }
263
264            if let Err(err) = db.create_cf(QUEUE_CF, &get_fqueue_cf_options()) {
265                return Err(IndexError::other(err));
266            }
267
268            if let Err(err) = db.drop_cf(INDEX_CF) {
269                return Err(IndexError::other(err));
270            }
271
272            if let Err(err) = db.create_cf(INDEX_CF, &get_findex_cf_options()) {
273                return Err(IndexError::other(err));
274            }
275            Ok(())
276        });
277
278        match task.await {
279            Ok(v) => v,
280            Err(e) => Err(IndexError::other(e)),
281        }
282    }
283}
284
285fn parse_peek_head(
286    head: Option<Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>>,
287) -> Result<Option<ElementTimestamp>, IndexError> {
288    if let Some(item) = head {
289        match item {
290            Ok((key, _)) => {
291                let due_time = u64::from_be_bytes(match key[0..8].try_into() {
292                    Ok(v) => v,
293                    Err(_) => return Err(IndexError::CorruptedData),
294                });
295                Ok(Some(due_time))
296            }
297            Err(e) => Err(IndexError::other(e)),
298        }
299    } else {
300        Ok(None)
301    }
302}
303
304fn remove_internal(
305    txn: &Transaction<OptimisticTransactionDB>,
306    index_cf: &impl AsColumnFamilyRef,
307    queue_cf: &impl AsColumnFamilyRef,
308    index_prefix: [u8; 12],
309) -> Result<(), IndexError> {
310    let iter = txn.prefix_iterator_cf(index_cf, index_prefix);
311
312    for item in iter {
313        match item {
314            Ok((key, due_time)) => {
315                if key[0..12] != index_prefix {
316                    break;
317                }
318
319                let hash = &key[12..];
320                let queue_key = [&due_time, hash].concat();
321
322                match txn.delete_cf(queue_cf, &queue_key) {
323                    Ok(_) => {}
324                    Err(e) => return Err(IndexError::other(e)),
325                };
326
327                match txn.delete_cf(index_cf, &key) {
328                    Ok(_) => {}
329                    Err(e) => return Err(IndexError::other(e)),
330                };
331            }
332            Err(e) => return Err(IndexError::other(e)),
333        }
334    }
335    Ok(())
336}
337
338fn encode_index_prefix(position_in_query: u32, group_signature: u64) -> [u8; 12] {
339    let mut buf = [0u8; 12];
340    buf[0..4].copy_from_slice(&position_in_query.to_be_bytes());
341    buf[4..12].copy_from_slice(&group_signature.to_be_bytes());
342    buf
343}
344
345pub(crate) fn get_fqueue_cf_options() -> Options {
346    let mut opts = Options::default();
347    opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
348    opts
349}
350
351pub(crate) fn get_findex_cf_options() -> Options {
352    let mut opts = Options::default();
353    opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(12));
354    opts
355}
356
357/// Collect all column family descriptors needed by the future queue.
358pub(crate) fn future_queue_cf_descriptors() -> Vec<rocksdb::ColumnFamilyDescriptor> {
359    vec![
360        rocksdb::ColumnFamilyDescriptor::new(QUEUE_CF, get_fqueue_cf_options()),
361        rocksdb::ColumnFamilyDescriptor::new(INDEX_CF, get_findex_cf_options()),
362    ]
363}