1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
7 lock.read()
8}
9
10fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
11 lock.write()
12}
13
14impl RedDBRuntime {
15 pub fn execute_probabilistic_command(
16 &self,
17 raw_query: &str,
18 cmd: &ProbabilisticCommand,
19 ) -> RedDBResult<RuntimeQueryResult> {
20 let is_mutation = matches!(
24 cmd,
25 ProbabilisticCommand::CreateHll { .. }
26 | ProbabilisticCommand::HllAdd { .. }
27 | ProbabilisticCommand::HllMerge { .. }
28 | ProbabilisticCommand::DropHll { .. }
29 | ProbabilisticCommand::CreateSketch { .. }
30 | ProbabilisticCommand::SketchAdd { .. }
31 | ProbabilisticCommand::SketchMerge { .. }
32 | ProbabilisticCommand::DropSketch { .. }
33 | ProbabilisticCommand::CreateFilter { .. }
34 | ProbabilisticCommand::FilterAdd { .. }
35 | ProbabilisticCommand::FilterDelete { .. }
36 | ProbabilisticCommand::DropFilter { .. }
37 );
38 if is_mutation {
39 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
40 }
41 match cmd {
42 ProbabilisticCommand::CreateHll {
44 name,
45 if_not_exists,
46 } => {
47 let mut hlls =
48 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
49 if hlls.contains_key(name) {
50 if *if_not_exists {
51 return Ok(RuntimeQueryResult::ok_message(
52 raw_query.to_string(),
53 &format!("HLL '{}' already exists", name),
54 "create",
55 ));
56 }
57 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
58 }
59 hlls.insert(
60 name.clone(),
61 crate::storage::primitives::hyperloglog::HyperLogLog::new(),
62 );
63 Ok(RuntimeQueryResult::ok_message(
64 raw_query.to_string(),
65 &format!("HLL '{}' created", name),
66 "create",
67 ))
68 }
69 ProbabilisticCommand::HllAdd { name, elements } => {
70 let mut hlls =
71 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
72 let hll = hlls
73 .get_mut(name)
74 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
75 for elem in elements {
76 hll.add(elem.as_bytes());
77 }
78 Ok(RuntimeQueryResult::ok_message(
79 raw_query.to_string(),
80 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
81 "insert",
82 ))
83 }
84 ProbabilisticCommand::HllCount { names } => {
85 let hlls =
86 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
87 if names.len() == 1 {
88 let hll = hlls.get(&names[0]).ok_or_else(|| {
89 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
90 })?;
91 let count = hll.count();
92 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
93 let mut record = UnifiedRecord::new();
94 record.set("count", Value::UnsignedInteger(count));
95 result.push(record);
96 Ok(RuntimeQueryResult {
97 query: raw_query.to_string(),
98 mode: QueryMode::Sql,
99 statement: "hll_count",
100 engine: "runtime-probabilistic",
101 result,
102 affected_rows: 0,
103 statement_type: "select",
104 })
105 } else {
106 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
108 for name in names {
109 let hll = hlls.get(name).ok_or_else(|| {
110 RedDBError::NotFound(format!("HLL '{}' not found", name))
111 })?;
112 merged.merge(hll);
113 }
114 let count = merged.count();
115 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
116 let mut record = UnifiedRecord::new();
117 record.set("count", Value::UnsignedInteger(count));
118 result.push(record);
119 Ok(RuntimeQueryResult {
120 query: raw_query.to_string(),
121 mode: QueryMode::Sql,
122 statement: "hll_count",
123 engine: "runtime-probabilistic",
124 result,
125 affected_rows: 0,
126 statement_type: "select",
127 })
128 }
129 }
130 ProbabilisticCommand::HllMerge { dest, sources } => {
131 let mut hlls =
132 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
133 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
134 for src in sources {
135 let hll = hlls
136 .get(src)
137 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
138 merged.merge(hll);
139 }
140 hlls.insert(dest.clone(), merged);
141 Ok(RuntimeQueryResult::ok_message(
142 raw_query.to_string(),
143 &format!(
144 "HLL '{}' created from merge of {}",
145 dest,
146 sources.join(", ")
147 ),
148 "create",
149 ))
150 }
151 ProbabilisticCommand::HllInfo { name } => {
152 let hlls =
153 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
154 let hll = hlls
155 .get(name)
156 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
157 let mut result = UnifiedResult::with_columns(vec![
158 "name".into(),
159 "count".into(),
160 "memory_bytes".into(),
161 ]);
162 let mut record = UnifiedRecord::new();
163 record.set("name", Value::text(name.clone()));
164 record.set("count", Value::UnsignedInteger(hll.count()));
165 record.set(
166 "memory_bytes",
167 Value::UnsignedInteger(hll.memory_bytes() as u64),
168 );
169 result.push(record);
170 Ok(RuntimeQueryResult {
171 query: raw_query.to_string(),
172 mode: QueryMode::Sql,
173 statement: "hll_info",
174 engine: "runtime-probabilistic",
175 result,
176 affected_rows: 0,
177 statement_type: "select",
178 })
179 }
180 ProbabilisticCommand::DropHll { name, if_exists } => {
181 let mut hlls =
182 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
183 if hlls.remove(name).is_none() {
184 if *if_exists {
185 return Ok(RuntimeQueryResult::ok_message(
186 raw_query.to_string(),
187 &format!("HLL '{}' does not exist", name),
188 "drop",
189 ));
190 }
191 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
192 }
193 Ok(RuntimeQueryResult::ok_message(
194 raw_query.to_string(),
195 &format!("HLL '{}' dropped", name),
196 "drop",
197 ))
198 }
199
200 ProbabilisticCommand::CreateSketch {
202 name,
203 width,
204 depth,
205 if_not_exists,
206 } => {
207 let mut sketches = probabilistic_write(
208 &self.inner.probabilistic.sketches,
209 "probabilistic sketch store",
210 );
211 if sketches.contains_key(name) {
212 if *if_not_exists {
213 return Ok(RuntimeQueryResult::ok_message(
214 raw_query.to_string(),
215 &format!("SKETCH '{}' already exists", name),
216 "create",
217 ));
218 }
219 return Err(RedDBError::Query(format!(
220 "SKETCH '{}' already exists",
221 name
222 )));
223 }
224 sketches.insert(
225 name.clone(),
226 crate::storage::primitives::count_min_sketch::CountMinSketch::new(
227 *width, *depth,
228 ),
229 );
230 Ok(RuntimeQueryResult::ok_message(
231 raw_query.to_string(),
232 &format!(
233 "SKETCH '{}' created (width={}, depth={})",
234 name, width, depth
235 ),
236 "create",
237 ))
238 }
239 ProbabilisticCommand::SketchAdd {
240 name,
241 element,
242 count,
243 } => {
244 let mut sketches = probabilistic_write(
245 &self.inner.probabilistic.sketches,
246 "probabilistic sketch store",
247 );
248 let sketch = sketches
249 .get_mut(name)
250 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
251 sketch.add(element.as_bytes(), *count);
252 Ok(RuntimeQueryResult::ok_message(
253 raw_query.to_string(),
254 &format!("added {} to SKETCH '{}'", count, name),
255 "insert",
256 ))
257 }
258 ProbabilisticCommand::SketchCount { name, element } => {
259 let sketches = probabilistic_read(
260 &self.inner.probabilistic.sketches,
261 "probabilistic sketch store",
262 );
263 let sketch = sketches
264 .get(name)
265 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
266 let estimate = sketch.estimate(element.as_bytes());
267 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
268 let mut record = UnifiedRecord::new();
269 record.set("estimate", Value::UnsignedInteger(estimate));
270 result.push(record);
271 Ok(RuntimeQueryResult {
272 query: raw_query.to_string(),
273 mode: QueryMode::Sql,
274 statement: "sketch_count",
275 engine: "runtime-probabilistic",
276 result,
277 affected_rows: 0,
278 statement_type: "select",
279 })
280 }
281 ProbabilisticCommand::SketchMerge { dest, sources } => {
282 let mut sketches = probabilistic_write(
283 &self.inner.probabilistic.sketches,
284 "probabilistic sketch store",
285 );
286 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
287 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
288 })?;
289 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
290 first_src.width(),
291 first_src.depth(),
292 );
293 for src in sources {
294 let sketch = sketches.get(src).ok_or_else(|| {
295 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
296 })?;
297 if !merged.merge(sketch) {
298 return Err(RedDBError::Query(format!(
299 "SKETCH '{}' has incompatible dimensions",
300 src
301 )));
302 }
303 }
304 sketches.insert(dest.clone(), merged);
305 Ok(RuntimeQueryResult::ok_message(
306 raw_query.to_string(),
307 &format!(
308 "SKETCH '{}' created from merge of {}",
309 dest,
310 sources.join(", ")
311 ),
312 "create",
313 ))
314 }
315 ProbabilisticCommand::SketchInfo { name } => {
316 let sketches = probabilistic_read(
317 &self.inner.probabilistic.sketches,
318 "probabilistic sketch store",
319 );
320 let sketch = sketches
321 .get(name)
322 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
323 let mut result = UnifiedResult::with_columns(vec![
324 "name".into(),
325 "width".into(),
326 "depth".into(),
327 "total".into(),
328 "memory_bytes".into(),
329 ]);
330 let mut record = UnifiedRecord::new();
331 record.set("name", Value::text(name.clone()));
332 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
333 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
334 record.set("total", Value::UnsignedInteger(sketch.total()));
335 record.set(
336 "memory_bytes",
337 Value::UnsignedInteger(sketch.memory_bytes() as u64),
338 );
339 result.push(record);
340 Ok(RuntimeQueryResult {
341 query: raw_query.to_string(),
342 mode: QueryMode::Sql,
343 statement: "sketch_info",
344 engine: "runtime-probabilistic",
345 result,
346 affected_rows: 0,
347 statement_type: "select",
348 })
349 }
350 ProbabilisticCommand::DropSketch { name, if_exists } => {
351 let mut sketches = probabilistic_write(
352 &self.inner.probabilistic.sketches,
353 "probabilistic sketch store",
354 );
355 if sketches.remove(name).is_none() {
356 if *if_exists {
357 return Ok(RuntimeQueryResult::ok_message(
358 raw_query.to_string(),
359 &format!("SKETCH '{}' does not exist", name),
360 "drop",
361 ));
362 }
363 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
364 }
365 Ok(RuntimeQueryResult::ok_message(
366 raw_query.to_string(),
367 &format!("SKETCH '{}' dropped", name),
368 "drop",
369 ))
370 }
371
372 ProbabilisticCommand::CreateFilter {
374 name,
375 capacity,
376 if_not_exists,
377 } => {
378 let mut filters = probabilistic_write(
379 &self.inner.probabilistic.filters,
380 "probabilistic filter store",
381 );
382 if filters.contains_key(name) {
383 if *if_not_exists {
384 return Ok(RuntimeQueryResult::ok_message(
385 raw_query.to_string(),
386 &format!("FILTER '{}' already exists", name),
387 "create",
388 ));
389 }
390 return Err(RedDBError::Query(format!(
391 "FILTER '{}' already exists",
392 name
393 )));
394 }
395 filters.insert(
396 name.clone(),
397 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity),
398 );
399 Ok(RuntimeQueryResult::ok_message(
400 raw_query.to_string(),
401 &format!("FILTER '{}' created (capacity={})", name, capacity),
402 "create",
403 ))
404 }
405 ProbabilisticCommand::FilterAdd { name, element } => {
406 let mut filters = probabilistic_write(
407 &self.inner.probabilistic.filters,
408 "probabilistic filter store",
409 );
410 let filter = filters
411 .get_mut(name)
412 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
413 if !filter.insert(element.as_bytes()) {
414 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
415 }
416 Ok(RuntimeQueryResult::ok_message(
417 raw_query.to_string(),
418 &format!("element added to FILTER '{}'", name),
419 "insert",
420 ))
421 }
422 ProbabilisticCommand::FilterCheck { name, element } => {
423 let filters = probabilistic_read(
424 &self.inner.probabilistic.filters,
425 "probabilistic filter store",
426 );
427 let filter = filters
428 .get(name)
429 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
430 let exists = filter.contains(element.as_bytes());
431 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
432 let mut record = UnifiedRecord::new();
433 record.set("exists", Value::Boolean(exists));
434 result.push(record);
435 Ok(RuntimeQueryResult {
436 query: raw_query.to_string(),
437 mode: QueryMode::Sql,
438 statement: "filter_check",
439 engine: "runtime-probabilistic",
440 result,
441 affected_rows: 0,
442 statement_type: "select",
443 })
444 }
445 ProbabilisticCommand::FilterDelete { name, element } => {
446 let mut filters = probabilistic_write(
447 &self.inner.probabilistic.filters,
448 "probabilistic filter store",
449 );
450 let filter = filters
451 .get_mut(name)
452 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
453 let removed = filter.delete(element.as_bytes());
454 Ok(RuntimeQueryResult::ok_message(
455 raw_query.to_string(),
456 &format!(
457 "element {} from FILTER '{}'",
458 if removed { "deleted" } else { "not found in" },
459 name
460 ),
461 "delete",
462 ))
463 }
464 ProbabilisticCommand::FilterCount { name } => {
465 let filters = probabilistic_read(
466 &self.inner.probabilistic.filters,
467 "probabilistic filter store",
468 );
469 let filter = filters
470 .get(name)
471 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
472 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
473 let mut record = UnifiedRecord::new();
474 record.set("count", Value::UnsignedInteger(filter.count() as u64));
475 result.push(record);
476 Ok(RuntimeQueryResult {
477 query: raw_query.to_string(),
478 mode: QueryMode::Sql,
479 statement: "filter_count",
480 engine: "runtime-probabilistic",
481 result,
482 affected_rows: 0,
483 statement_type: "select",
484 })
485 }
486 ProbabilisticCommand::FilterInfo { name } => {
487 let filters = probabilistic_read(
488 &self.inner.probabilistic.filters,
489 "probabilistic filter store",
490 );
491 let filter = filters
492 .get(name)
493 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
494 let mut result = UnifiedResult::with_columns(vec![
495 "name".into(),
496 "count".into(),
497 "load_factor".into(),
498 "memory_bytes".into(),
499 ]);
500 let mut record = UnifiedRecord::new();
501 record.set("name", Value::text(name.clone()));
502 record.set("count", Value::UnsignedInteger(filter.count() as u64));
503 record.set("load_factor", Value::Float(filter.load_factor()));
504 record.set(
505 "memory_bytes",
506 Value::UnsignedInteger(filter.memory_bytes() as u64),
507 );
508 result.push(record);
509 Ok(RuntimeQueryResult {
510 query: raw_query.to_string(),
511 mode: QueryMode::Sql,
512 statement: "filter_info",
513 engine: "runtime-probabilistic",
514 result,
515 affected_rows: 0,
516 statement_type: "select",
517 })
518 }
519 ProbabilisticCommand::DropFilter { name, if_exists } => {
520 let mut filters = probabilistic_write(
521 &self.inner.probabilistic.filters,
522 "probabilistic filter store",
523 );
524 if filters.remove(name).is_none() {
525 if *if_exists {
526 return Ok(RuntimeQueryResult::ok_message(
527 raw_query.to_string(),
528 &format!("FILTER '{}' does not exist", name),
529 "drop",
530 ));
531 }
532 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
533 }
534 Ok(RuntimeQueryResult::ok_message(
535 raw_query.to_string(),
536 &format!("FILTER '{}' dropped", name),
537 "drop",
538 ))
539 }
540 }
541 }
542}