1use std::{
16 hash::{Hash, Hasher},
17 sync::Arc,
18};
19
20use async_trait::async_trait;
21use drasi_core::{
22 interface::{FutureElementRef, FutureQueue, IndexError, PushType},
23 models::{ElementReference, ElementTimestamp},
24};
25use hashers::jenkins::spooky_hash::SpookyHasher;
26use prost::{bytes::Bytes, Message};
27use rocksdb::{
28 AsColumnFamilyRef, OptimisticTransactionDB, Options, ReadOptions, SliceTransform, Transaction,
29};
30use tokio::task;
31
32use crate::storage_models::{StoredElementReference, StoredFutureElementRef};
33use crate::RocksDbSessionState;
34
35pub struct RocksDbFutureQueue {
41 db: Arc<OptimisticTransactionDB>,
42 session_state: Arc<RocksDbSessionState>,
43}
44
45const QUEUE_CF: &str = "fqueue";
46const INDEX_CF: &str = "findex";
47
48impl RocksDbFutureQueue {
49 pub fn new(db: Arc<OptimisticTransactionDB>, session_state: Arc<RocksDbSessionState>) -> Self {
54 Self { db, session_state }
55 }
56}
57
58#[async_trait]
59impl FutureQueue for RocksDbFutureQueue {
60 async fn push(
61 &self,
62 push_type: PushType,
63 position_in_query: usize,
64 group_signature: u64,
65 element_ref: &ElementReference,
66 original_time: ElementTimestamp,
67 due_time: ElementTimestamp,
68 ) -> Result<bool, IndexError> {
69 let db = self.db.clone();
70 let session_state = self.session_state.clone();
71
72 let stored_element_ref: StoredElementReference = element_ref.into();
73
74 let task = task::spawn_blocking(move || {
75 let index_cf = db
76 .cf_handle(INDEX_CF)
77 .expect("findex Column family not found");
78 let queue_cf = db
79 .cf_handle(QUEUE_CF)
80 .expect("fqueue Column family not found");
81
82 let future_ref = StoredFutureElementRef {
83 element_ref: stored_element_ref,
84 original_time,
85 due_time,
86 position_in_query: position_in_query as u32,
87 group_signature,
88 };
89
90 let hash = {
91 let mut h = SpookyHasher::default();
92 let _ = &future_ref.hash(&mut h);
93 h.finish().to_be_bytes()
94 };
95
96 let prefix = encode_index_prefix(position_in_query as u32, group_signature);
97
98 session_state.with_txn(|txn| {
99 let should_push = match push_type {
100 PushType::Always => true,
101 PushType::IfNotExists => {
102 let mut iter = txn.prefix_iterator_cf(&index_cf, prefix);
103 match iter.next() {
104 Some(item) => match item {
105 Ok((key, _)) => key[0..12] != prefix,
106 Err(e) => return Err(IndexError::other(e)),
107 },
108 None => true,
109 }
110 }
111 PushType::Overwrite => {
112 remove_internal(txn, &index_cf, &queue_cf, prefix)?;
113 true
114 }
115 };
116
117 if should_push {
118 let index_key = {
119 let mut buf = [0u8; 20];
120 buf[0..12].copy_from_slice(&prefix);
121 buf[12..20].copy_from_slice(&hash);
122 buf
123 };
124
125 let queue_key = {
126 let mut buf = [0u8; 16];
127 buf[0..8].copy_from_slice(&due_time.to_be_bytes());
128 buf[8..16].copy_from_slice(&hash);
129 buf
130 };
131
132 txn.put_cf(&index_cf, index_key, due_time.to_be_bytes())
133 .map_err(IndexError::other)?;
134 txn.put_cf(&queue_cf, queue_key, future_ref.encode_to_vec())
135 .map_err(IndexError::other)?;
136 }
137
138 Ok(should_push)
139 })
140 });
141
142 match task.await {
143 Ok(v) => v,
144 Err(e) => Err(IndexError::other(e)),
145 }
146 }
147
148 async fn remove(
149 &self,
150 position_in_query: usize,
151 group_signature: u64,
152 ) -> Result<(), IndexError> {
153 let db = self.db.clone();
154 let session_state = self.session_state.clone();
155
156 let task = task::spawn_blocking(move || {
157 let index_cf = db
158 .cf_handle(INDEX_CF)
159 .expect("findex column family not found");
160 let queue_cf = db
161 .cf_handle(QUEUE_CF)
162 .expect("fqueue column family not found");
163
164 let prefix = encode_index_prefix(position_in_query as u32, group_signature);
165
166 session_state.with_txn(|txn| remove_internal(txn, &index_cf, &queue_cf, prefix))
167 });
168
169 match task.await {
170 Ok(v) => v,
171 Err(e) => Err(IndexError::other(e)),
172 }
173 }
174
175 async fn pop(&self) -> Result<Option<FutureElementRef>, IndexError> {
176 let db = self.db.clone();
177 let session_state = self.session_state.clone();
178
179 let task = task::spawn_blocking(move || {
180 let index_cf = db
181 .cf_handle(INDEX_CF)
182 .expect("findex column family not found");
183 let queue_cf = db
184 .cf_handle(QUEUE_CF)
185 .expect("fqueue column family not found");
186
187 let read_opts = ReadOptions::default();
188
189 session_state.with_txn(|txn| {
190 let mut iter =
191 txn.iterator_cf_opt(&queue_cf, read_opts, rocksdb::IteratorMode::Start);
192
193 let result = match iter.next() {
194 Some(head) => match head {
195 Ok((key, future_ref)) => {
196 let hash = &key[8..];
197
198 let stored_future_ref =
199 match StoredFutureElementRef::decode(Bytes::from(future_ref)) {
200 Ok(v) => v,
201 Err(_) => return Err(IndexError::CorruptedData),
202 };
203
204 txn.delete_cf(&queue_cf, &key).map_err(IndexError::other)?;
205
206 let prefix = encode_index_prefix(
207 stored_future_ref.position_in_query,
208 stored_future_ref.group_signature,
209 );
210 let index_key = {
211 let mut buf = [0u8; 20];
212 buf[0..12].copy_from_slice(&prefix);
213 buf[12..20].copy_from_slice(hash);
214 buf
215 };
216
217 txn.delete_cf(&index_cf, index_key)
218 .map_err(IndexError::other)?;
219
220 let future_ref: FutureElementRef = stored_future_ref.into();
221 Some(future_ref)
222 }
223 Err(e) => return Err(IndexError::other(e)),
224 },
225 _ => None,
226 };
227
228 Ok(result)
229 })
230 });
231
232 match task.await {
233 Ok(v) => v,
234 Err(e) => Err(IndexError::other(e)),
235 }
236 }
237
238 async fn peek_due_time(&self) -> Result<Option<ElementTimestamp>, IndexError> {
239 let db = self.db.clone();
240
241 let task = task::spawn_blocking(move || {
242 let queue_cf = db
243 .cf_handle(QUEUE_CF)
244 .expect("fqueue Column family not found");
245
246 let read_opts = ReadOptions::default();
247 let mut iter = db.iterator_cf_opt(&queue_cf, read_opts, rocksdb::IteratorMode::Start);
248 parse_peek_head(iter.next())
249 });
250
251 match task.await {
252 Ok(v) => v,
253 Err(e) => Err(IndexError::other(e)),
254 }
255 }
256
257 async fn clear(&self) -> Result<(), IndexError> {
258 let db = self.db.clone();
259 let task = task::spawn_blocking(move || {
260 if let Err(err) = db.drop_cf(QUEUE_CF) {
261 return Err(IndexError::other(err));
262 }
263
264 if let Err(err) = db.create_cf(QUEUE_CF, &get_fqueue_cf_options()) {
265 return Err(IndexError::other(err));
266 }
267
268 if let Err(err) = db.drop_cf(INDEX_CF) {
269 return Err(IndexError::other(err));
270 }
271
272 if let Err(err) = db.create_cf(INDEX_CF, &get_findex_cf_options()) {
273 return Err(IndexError::other(err));
274 }
275 Ok(())
276 });
277
278 match task.await {
279 Ok(v) => v,
280 Err(e) => Err(IndexError::other(e)),
281 }
282 }
283}
284
285fn parse_peek_head(
286 head: Option<Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>>,
287) -> Result<Option<ElementTimestamp>, IndexError> {
288 if let Some(item) = head {
289 match item {
290 Ok((key, _)) => {
291 let due_time = u64::from_be_bytes(match key[0..8].try_into() {
292 Ok(v) => v,
293 Err(_) => return Err(IndexError::CorruptedData),
294 });
295 Ok(Some(due_time))
296 }
297 Err(e) => Err(IndexError::other(e)),
298 }
299 } else {
300 Ok(None)
301 }
302}
303
304fn remove_internal(
305 txn: &Transaction<OptimisticTransactionDB>,
306 index_cf: &impl AsColumnFamilyRef,
307 queue_cf: &impl AsColumnFamilyRef,
308 index_prefix: [u8; 12],
309) -> Result<(), IndexError> {
310 let iter = txn.prefix_iterator_cf(index_cf, index_prefix);
311
312 for item in iter {
313 match item {
314 Ok((key, due_time)) => {
315 if key[0..12] != index_prefix {
316 break;
317 }
318
319 let hash = &key[12..];
320 let queue_key = [&due_time, hash].concat();
321
322 match txn.delete_cf(queue_cf, &queue_key) {
323 Ok(_) => {}
324 Err(e) => return Err(IndexError::other(e)),
325 };
326
327 match txn.delete_cf(index_cf, &key) {
328 Ok(_) => {}
329 Err(e) => return Err(IndexError::other(e)),
330 };
331 }
332 Err(e) => return Err(IndexError::other(e)),
333 }
334 }
335 Ok(())
336}
337
338fn encode_index_prefix(position_in_query: u32, group_signature: u64) -> [u8; 12] {
339 let mut buf = [0u8; 12];
340 buf[0..4].copy_from_slice(&position_in_query.to_be_bytes());
341 buf[4..12].copy_from_slice(&group_signature.to_be_bytes());
342 buf
343}
344
345pub(crate) fn get_fqueue_cf_options() -> Options {
346 let mut opts = Options::default();
347 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(8));
348 opts
349}
350
351pub(crate) fn get_findex_cf_options() -> Options {
352 let mut opts = Options::default();
353 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(12));
354 opts
355}
356
357pub(crate) fn future_queue_cf_descriptors() -> Vec<rocksdb::ColumnFamilyDescriptor> {
359 vec![
360 rocksdb::ColumnFamilyDescriptor::new(QUEUE_CF, get_fqueue_cf_options()),
361 rocksdb::ColumnFamilyDescriptor::new(INDEX_CF, get_findex_cf_options()),
362 ]
363}