1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
7 lock.read()
8}
9
10fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
11 lock.write()
12}
13
14fn probabilistic_collection_contract(
15 name: &str,
16 model: crate::catalog::CollectionModel,
17) -> crate::physical::CollectionContract {
18 let now = crate::utils::now_unix_millis() as u128;
19 crate::physical::CollectionContract {
20 name: name.to_string(),
21 declared_model: model,
22 schema_mode: crate::catalog::SchemaMode::Dynamic,
23 origin: crate::physical::ContractOrigin::Explicit,
24 version: 1,
25 created_at_unix_ms: now,
26 updated_at_unix_ms: now,
27 default_ttl_ms: None,
28 vector_dimension: None,
29 vector_metric: None,
30 context_index_fields: Vec::new(),
31 declared_columns: Vec::new(),
32 table_def: None,
33 timestamps_enabled: false,
34 context_index_enabled: false,
35 append_only: false,
36 subscriptions: Vec::new(),
37 }
38}
39
40enum ProbabilisticReadProjection {
41 Cardinality { label: String },
42 Freq { element: String, label: String },
43 Contains { element: String, label: String },
44}
45
46impl RedDBRuntime {
47 fn create_probabilistic_catalog_entry(
48 &self,
49 name: &str,
50 model: crate::catalog::CollectionModel,
51 ) -> RedDBResult<()> {
52 let store = self.inner.db.store();
53 store
54 .create_collection(name)
55 .map_err(|err| RedDBError::Internal(err.to_string()))?;
56 self.inner
57 .db
58 .save_collection_contract(probabilistic_collection_contract(name, model))
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
61 store.set_config_tree(
62 &format!("red.collection_tenants.{name}"),
63 &crate::serde_json::Value::String(tenant_id),
64 );
65 }
66 self.inner
67 .db
68 .persist_metadata()
69 .map_err(|err| RedDBError::Internal(err.to_string()))?;
70 self.invalidate_result_cache();
71 Ok(())
72 }
73
74 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
75 let store = self.inner.db.store();
76 if store.get_collection(name).is_some() {
77 store
78 .drop_collection(name)
79 .map_err(|err| RedDBError::Internal(err.to_string()))?;
80 }
81 self.inner
82 .db
83 .remove_collection_contract(name)
84 .map_err(|err| RedDBError::Internal(err.to_string()))?;
85 self.inner
86 .db
87 .persist_metadata()
88 .map_err(|err| RedDBError::Internal(err.to_string()))?;
89 self.invalidate_result_cache();
90 Ok(())
91 }
92
93 pub(crate) fn execute_probabilistic_select(
94 &self,
95 query: &TableQuery,
96 ) -> RedDBResult<Option<UnifiedResult>> {
97 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
98 let mut read_projections = Vec::new();
99 for projection in &projections {
100 if let Some(read_projection) =
101 parse_probabilistic_read_projection(projection, read_projections.len())?
102 {
103 read_projections.push(read_projection);
104 }
105 }
106
107 let Some(actual_model) = self
108 .inner
109 .db
110 .collection_contract(&query.table)
111 .map(|contract| contract.declared_model)
112 else {
113 return if read_projections.is_empty() {
114 Ok(None)
115 } else {
116 Err(RedDBError::NotFound(format!(
117 "probabilistic collection '{}' not found",
118 query.table
119 )))
120 };
121 };
122
123 let is_probabilistic_model = matches!(
124 actual_model,
125 crate::catalog::CollectionModel::Hll
126 | crate::catalog::CollectionModel::Sketch
127 | crate::catalog::CollectionModel::Filter
128 );
129 if read_projections.is_empty() {
130 return if is_probabilistic_model {
131 Err(RedDBError::Query(format!(
132 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
133 query.table
134 )))
135 } else {
136 Ok(None)
137 };
138 }
139
140 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
141 let (columns, record) =
142 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
143 let mut result = UnifiedResult::with_columns(columns);
144 if probabilistic_select_row_visible(self, query, &record) {
145 result.push(record);
146 }
147 Ok(Some(result))
148 }
149
150 pub fn execute_probabilistic_command(
151 &self,
152 raw_query: &str,
153 cmd: &ProbabilisticCommand,
154 ) -> RedDBResult<RuntimeQueryResult> {
155 let is_mutation = matches!(
159 cmd,
160 ProbabilisticCommand::CreateHll { .. }
161 | ProbabilisticCommand::HllAdd { .. }
162 | ProbabilisticCommand::HllMerge { .. }
163 | ProbabilisticCommand::DropHll { .. }
164 | ProbabilisticCommand::CreateSketch { .. }
165 | ProbabilisticCommand::SketchAdd { .. }
166 | ProbabilisticCommand::SketchMerge { .. }
167 | ProbabilisticCommand::DropSketch { .. }
168 | ProbabilisticCommand::CreateFilter { .. }
169 | ProbabilisticCommand::FilterAdd { .. }
170 | ProbabilisticCommand::FilterDelete { .. }
171 | ProbabilisticCommand::DropFilter { .. }
172 );
173 if is_mutation {
174 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
175 }
176 match cmd {
177 ProbabilisticCommand::CreateHll {
179 name,
180 precision,
181 if_not_exists,
182 } => {
183 let mut hlls =
184 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
185 if hlls.contains_key(name) {
186 if *if_not_exists {
187 return Ok(RuntimeQueryResult::ok_message(
188 raw_query.to_string(),
189 &format!("HLL '{}' already exists", name),
190 "create",
191 ));
192 }
193 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
194 }
195 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
196 *precision,
197 )
198 .ok_or_else(|| {
199 RedDBError::Query(format!(
200 "HLL precision must be between 4 and 18, got {precision}"
201 ))
202 })?;
203 self.create_probabilistic_catalog_entry(
204 name,
205 crate::catalog::CollectionModel::Hll,
206 )?;
207 hlls.insert(name.clone(), hll);
208 Ok(RuntimeQueryResult::ok_message(
209 raw_query.to_string(),
210 &format!("HLL '{}' created", name),
211 "create",
212 ))
213 }
214 ProbabilisticCommand::HllAdd { name, elements } => {
215 let mut hlls =
216 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
217 let hll = hlls
218 .get_mut(name)
219 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
220 for elem in elements {
221 hll.add(elem.as_bytes());
222 }
223 Ok(RuntimeQueryResult::ok_message(
224 raw_query.to_string(),
225 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
226 "insert",
227 ))
228 }
229 ProbabilisticCommand::HllCount { names } => {
230 let hlls =
231 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
232 if names.len() == 1 {
233 let hll = hlls.get(&names[0]).ok_or_else(|| {
234 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
235 })?;
236 let count = hll.count();
237 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
238 let mut record = UnifiedRecord::new();
239 record.set("count", Value::UnsignedInteger(count));
240 result.push(record);
241 Ok(RuntimeQueryResult {
242 query: raw_query.to_string(),
243 mode: QueryMode::Sql,
244 statement: "hll_count",
245 engine: "runtime-probabilistic",
246 result,
247 affected_rows: 0,
248 statement_type: "select",
249 })
250 } else {
251 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
253 for name in names {
254 let hll = hlls.get(name).ok_or_else(|| {
255 RedDBError::NotFound(format!("HLL '{}' not found", name))
256 })?;
257 merged.merge(hll);
258 }
259 let count = merged.count();
260 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
261 let mut record = UnifiedRecord::new();
262 record.set("count", Value::UnsignedInteger(count));
263 result.push(record);
264 Ok(RuntimeQueryResult {
265 query: raw_query.to_string(),
266 mode: QueryMode::Sql,
267 statement: "hll_count",
268 engine: "runtime-probabilistic",
269 result,
270 affected_rows: 0,
271 statement_type: "select",
272 })
273 }
274 }
275 ProbabilisticCommand::HllMerge { dest, sources } => {
276 let mut hlls =
277 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
278 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
279 for src in sources {
280 let hll = hlls
281 .get(src)
282 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
283 merged.merge(hll);
284 }
285 hlls.insert(dest.clone(), merged);
286 Ok(RuntimeQueryResult::ok_message(
287 raw_query.to_string(),
288 &format!(
289 "HLL '{}' created from merge of {}",
290 dest,
291 sources.join(", ")
292 ),
293 "create",
294 ))
295 }
296 ProbabilisticCommand::HllInfo { name } => {
297 let hlls =
298 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
299 let hll = hlls
300 .get(name)
301 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
302 let mut result = UnifiedResult::with_columns(vec![
303 "name".into(),
304 "precision".into(),
305 "count".into(),
306 "memory_bytes".into(),
307 ]);
308 let mut record = UnifiedRecord::new();
309 record.set("name", Value::text(name.clone()));
310 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
311 record.set("count", Value::UnsignedInteger(hll.count()));
312 record.set(
313 "memory_bytes",
314 Value::UnsignedInteger(hll.memory_bytes() as u64),
315 );
316 result.push(record);
317 Ok(RuntimeQueryResult {
318 query: raw_query.to_string(),
319 mode: QueryMode::Sql,
320 statement: "hll_info",
321 engine: "runtime-probabilistic",
322 result,
323 affected_rows: 0,
324 statement_type: "select",
325 })
326 }
327 ProbabilisticCommand::DropHll { name, if_exists } => {
328 let mut hlls =
329 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
330 if hlls.remove(name).is_none() {
331 if *if_exists {
332 return Ok(RuntimeQueryResult::ok_message(
333 raw_query.to_string(),
334 &format!("HLL '{}' does not exist", name),
335 "drop",
336 ));
337 }
338 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
339 }
340 self.drop_probabilistic_catalog_entry(name)?;
341 Ok(RuntimeQueryResult::ok_message(
342 raw_query.to_string(),
343 &format!("HLL '{}' dropped", name),
344 "drop",
345 ))
346 }
347
348 ProbabilisticCommand::CreateSketch {
350 name,
351 width,
352 depth,
353 if_not_exists,
354 } => {
355 let mut sketches = probabilistic_write(
356 &self.inner.probabilistic.sketches,
357 "probabilistic sketch store",
358 );
359 if sketches.contains_key(name) {
360 if *if_not_exists {
361 return Ok(RuntimeQueryResult::ok_message(
362 raw_query.to_string(),
363 &format!("SKETCH '{}' already exists", name),
364 "create",
365 ));
366 }
367 return Err(RedDBError::Query(format!(
368 "SKETCH '{}' already exists",
369 name
370 )));
371 }
372 self.create_probabilistic_catalog_entry(
373 name,
374 crate::catalog::CollectionModel::Sketch,
375 )?;
376 sketches.insert(
377 name.clone(),
378 crate::storage::primitives::count_min_sketch::CountMinSketch::new(
379 *width, *depth,
380 ),
381 );
382 Ok(RuntimeQueryResult::ok_message(
383 raw_query.to_string(),
384 &format!(
385 "SKETCH '{}' created (width={}, depth={})",
386 name, width, depth
387 ),
388 "create",
389 ))
390 }
391 ProbabilisticCommand::SketchAdd {
392 name,
393 element,
394 count,
395 } => {
396 let mut sketches = probabilistic_write(
397 &self.inner.probabilistic.sketches,
398 "probabilistic sketch store",
399 );
400 let sketch = sketches
401 .get_mut(name)
402 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
403 sketch.add(element.as_bytes(), *count);
404 Ok(RuntimeQueryResult::ok_message(
405 raw_query.to_string(),
406 &format!("added {} to SKETCH '{}'", count, name),
407 "insert",
408 ))
409 }
410 ProbabilisticCommand::SketchCount { name, element } => {
411 let sketches = probabilistic_read(
412 &self.inner.probabilistic.sketches,
413 "probabilistic sketch store",
414 );
415 let sketch = sketches
416 .get(name)
417 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
418 let estimate = sketch.estimate(element.as_bytes());
419 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
420 let mut record = UnifiedRecord::new();
421 record.set("estimate", Value::UnsignedInteger(estimate));
422 result.push(record);
423 Ok(RuntimeQueryResult {
424 query: raw_query.to_string(),
425 mode: QueryMode::Sql,
426 statement: "sketch_count",
427 engine: "runtime-probabilistic",
428 result,
429 affected_rows: 0,
430 statement_type: "select",
431 })
432 }
433 ProbabilisticCommand::SketchMerge { dest, sources } => {
434 let mut sketches = probabilistic_write(
435 &self.inner.probabilistic.sketches,
436 "probabilistic sketch store",
437 );
438 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
439 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
440 })?;
441 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
442 first_src.width(),
443 first_src.depth(),
444 );
445 for src in sources {
446 let sketch = sketches.get(src).ok_or_else(|| {
447 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
448 })?;
449 if !merged.merge(sketch) {
450 return Err(RedDBError::Query(format!(
451 "SKETCH '{}' has incompatible dimensions",
452 src
453 )));
454 }
455 }
456 sketches.insert(dest.clone(), merged);
457 Ok(RuntimeQueryResult::ok_message(
458 raw_query.to_string(),
459 &format!(
460 "SKETCH '{}' created from merge of {}",
461 dest,
462 sources.join(", ")
463 ),
464 "create",
465 ))
466 }
467 ProbabilisticCommand::SketchInfo { name } => {
468 let sketches = probabilistic_read(
469 &self.inner.probabilistic.sketches,
470 "probabilistic sketch store",
471 );
472 let sketch = sketches
473 .get(name)
474 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
475 let mut result = UnifiedResult::with_columns(vec![
476 "name".into(),
477 "width".into(),
478 "depth".into(),
479 "total".into(),
480 "memory_bytes".into(),
481 ]);
482 let mut record = UnifiedRecord::new();
483 record.set("name", Value::text(name.clone()));
484 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
485 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
486 record.set("total", Value::UnsignedInteger(sketch.total()));
487 record.set(
488 "memory_bytes",
489 Value::UnsignedInteger(sketch.memory_bytes() as u64),
490 );
491 result.push(record);
492 Ok(RuntimeQueryResult {
493 query: raw_query.to_string(),
494 mode: QueryMode::Sql,
495 statement: "sketch_info",
496 engine: "runtime-probabilistic",
497 result,
498 affected_rows: 0,
499 statement_type: "select",
500 })
501 }
502 ProbabilisticCommand::DropSketch { name, if_exists } => {
503 let mut sketches = probabilistic_write(
504 &self.inner.probabilistic.sketches,
505 "probabilistic sketch store",
506 );
507 if sketches.remove(name).is_none() {
508 if *if_exists {
509 return Ok(RuntimeQueryResult::ok_message(
510 raw_query.to_string(),
511 &format!("SKETCH '{}' does not exist", name),
512 "drop",
513 ));
514 }
515 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
516 }
517 self.drop_probabilistic_catalog_entry(name)?;
518 Ok(RuntimeQueryResult::ok_message(
519 raw_query.to_string(),
520 &format!("SKETCH '{}' dropped", name),
521 "drop",
522 ))
523 }
524
525 ProbabilisticCommand::CreateFilter {
527 name,
528 capacity,
529 if_not_exists,
530 } => {
531 let mut filters = probabilistic_write(
532 &self.inner.probabilistic.filters,
533 "probabilistic filter store",
534 );
535 if filters.contains_key(name) {
536 if *if_not_exists {
537 return Ok(RuntimeQueryResult::ok_message(
538 raw_query.to_string(),
539 &format!("FILTER '{}' already exists", name),
540 "create",
541 ));
542 }
543 return Err(RedDBError::Query(format!(
544 "FILTER '{}' already exists",
545 name
546 )));
547 }
548 self.create_probabilistic_catalog_entry(
549 name,
550 crate::catalog::CollectionModel::Filter,
551 )?;
552 filters.insert(
553 name.clone(),
554 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity),
555 );
556 Ok(RuntimeQueryResult::ok_message(
557 raw_query.to_string(),
558 &format!("FILTER '{}' created (capacity={})", name, capacity),
559 "create",
560 ))
561 }
562 ProbabilisticCommand::FilterAdd { name, element } => {
563 let mut filters = probabilistic_write(
564 &self.inner.probabilistic.filters,
565 "probabilistic filter store",
566 );
567 let filter = filters
568 .get_mut(name)
569 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
570 if !filter.insert(element.as_bytes()) {
571 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
572 }
573 Ok(RuntimeQueryResult::ok_message(
574 raw_query.to_string(),
575 &format!("element added to FILTER '{}'", name),
576 "insert",
577 ))
578 }
579 ProbabilisticCommand::FilterCheck { name, element } => {
580 let filters = probabilistic_read(
581 &self.inner.probabilistic.filters,
582 "probabilistic filter store",
583 );
584 let filter = filters
585 .get(name)
586 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
587 let exists = filter.contains(element.as_bytes());
588 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
589 let mut record = UnifiedRecord::new();
590 record.set("exists", Value::Boolean(exists));
591 result.push(record);
592 Ok(RuntimeQueryResult {
593 query: raw_query.to_string(),
594 mode: QueryMode::Sql,
595 statement: "filter_check",
596 engine: "runtime-probabilistic",
597 result,
598 affected_rows: 0,
599 statement_type: "select",
600 })
601 }
602 ProbabilisticCommand::FilterDelete { name, element } => {
603 let mut filters = probabilistic_write(
604 &self.inner.probabilistic.filters,
605 "probabilistic filter store",
606 );
607 let filter = filters
608 .get_mut(name)
609 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
610 let removed = filter.delete(element.as_bytes());
611 Ok(RuntimeQueryResult::ok_message(
612 raw_query.to_string(),
613 &format!(
614 "element {} from FILTER '{}'",
615 if removed { "deleted" } else { "not found in" },
616 name
617 ),
618 "delete",
619 ))
620 }
621 ProbabilisticCommand::FilterCount { name } => {
622 let filters = probabilistic_read(
623 &self.inner.probabilistic.filters,
624 "probabilistic filter store",
625 );
626 let filter = filters
627 .get(name)
628 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
629 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
630 let mut record = UnifiedRecord::new();
631 record.set("count", Value::UnsignedInteger(filter.count() as u64));
632 result.push(record);
633 Ok(RuntimeQueryResult {
634 query: raw_query.to_string(),
635 mode: QueryMode::Sql,
636 statement: "filter_count",
637 engine: "runtime-probabilistic",
638 result,
639 affected_rows: 0,
640 statement_type: "select",
641 })
642 }
643 ProbabilisticCommand::FilterInfo { name } => {
644 let filters = probabilistic_read(
645 &self.inner.probabilistic.filters,
646 "probabilistic filter store",
647 );
648 let filter = filters
649 .get(name)
650 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
651 let mut result = UnifiedResult::with_columns(vec![
652 "name".into(),
653 "count".into(),
654 "load_factor".into(),
655 "memory_bytes".into(),
656 ]);
657 let mut record = UnifiedRecord::new();
658 record.set("name", Value::text(name.clone()));
659 record.set("count", Value::UnsignedInteger(filter.count() as u64));
660 record.set("load_factor", Value::Float(filter.load_factor()));
661 record.set(
662 "memory_bytes",
663 Value::UnsignedInteger(filter.memory_bytes() as u64),
664 );
665 result.push(record);
666 Ok(RuntimeQueryResult {
667 query: raw_query.to_string(),
668 mode: QueryMode::Sql,
669 statement: "filter_info",
670 engine: "runtime-probabilistic",
671 result,
672 affected_rows: 0,
673 statement_type: "select",
674 })
675 }
676 ProbabilisticCommand::DropFilter { name, if_exists } => {
677 let mut filters = probabilistic_write(
678 &self.inner.probabilistic.filters,
679 "probabilistic filter store",
680 );
681 if filters.remove(name).is_none() {
682 if *if_exists {
683 return Ok(RuntimeQueryResult::ok_message(
684 raw_query.to_string(),
685 &format!("FILTER '{}' does not exist", name),
686 "drop",
687 ));
688 }
689 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
690 }
691 self.drop_probabilistic_catalog_entry(name)?;
692 Ok(RuntimeQueryResult::ok_message(
693 raw_query.to_string(),
694 &format!("FILTER '{}' dropped", name),
695 "drop",
696 ))
697 }
698 }
699 }
700}
701
702fn parse_probabilistic_read_projection(
703 projection: &Projection,
704 index: usize,
705) -> RedDBResult<Option<ProbabilisticReadProjection>> {
706 if let Some(column) = projection_unqualified_column(projection) {
707 if column.eq_ignore_ascii_case("CARDINALITY") {
708 return Ok(Some(ProbabilisticReadProjection::Cardinality {
709 label: probabilistic_projection_label(projection, "cardinality", index),
710 }));
711 }
712 }
713
714 let Some((function, args)) = projection_function(projection) else {
715 return Ok(None);
716 };
717 if function.eq_ignore_ascii_case("FREQ") {
718 let element = projection_single_text_arg(function, args)?;
719 return Ok(Some(ProbabilisticReadProjection::Freq {
720 element,
721 label: probabilistic_projection_label(projection, "freq", index),
722 }));
723 }
724 if function.eq_ignore_ascii_case("CONTAINS") {
725 let element = projection_single_text_arg(function, args)?;
726 return Ok(Some(ProbabilisticReadProjection::Contains {
727 element,
728 label: probabilistic_projection_label(projection, "contains", index),
729 }));
730 }
731
732 Ok(None)
733}
734
735fn validate_probabilistic_read_model(
736 collection: &str,
737 actual_model: crate::catalog::CollectionModel,
738 projections: &[ProbabilisticReadProjection],
739) -> RedDBResult<()> {
740 for projection in projections {
741 let expected_model = match projection {
742 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
743 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
744 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
745 };
746 if actual_model != expected_model {
747 return Err(RedDBError::Query(format!(
748 "{} is only supported for {} collections; '{}' is {}",
749 probabilistic_projection_form(projection),
750 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
751 collection,
752 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
753 )));
754 }
755 }
756 Ok(())
757}
758
759impl RedDBRuntime {
760 fn materialize_probabilistic_select_row(
761 &self,
762 collection: &str,
763 projections: &[ProbabilisticReadProjection],
764 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
765 let mut columns = Vec::with_capacity(projections.len());
766 let mut record = UnifiedRecord::new();
767 for projection in projections {
768 match projection {
769 ProbabilisticReadProjection::Cardinality { label } => {
770 let hlls = probabilistic_read(
771 &self.inner.probabilistic.hlls,
772 "probabilistic HLL store",
773 );
774 let hll = hlls.get(collection).ok_or_else(|| {
775 RedDBError::NotFound(format!("HLL '{}' not found", collection))
776 })?;
777 columns.push(label.clone());
778 record.set(label, Value::UnsignedInteger(hll.count()));
779 }
780 ProbabilisticReadProjection::Freq { element, label } => {
781 let sketches = probabilistic_read(
782 &self.inner.probabilistic.sketches,
783 "probabilistic sketch store",
784 );
785 let sketch = sketches.get(collection).ok_or_else(|| {
786 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
787 })?;
788 columns.push(label.clone());
789 record.set(
790 label,
791 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
792 );
793 }
794 ProbabilisticReadProjection::Contains { element, label } => {
795 let filters = probabilistic_read(
796 &self.inner.probabilistic.filters,
797 "probabilistic filter store",
798 );
799 let filter = filters.get(collection).ok_or_else(|| {
800 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
801 })?;
802 columns.push(label.clone());
803 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
804 }
805 }
806 }
807 Ok((columns, record))
808 }
809}
810
811fn probabilistic_select_row_visible(
812 runtime: &RedDBRuntime,
813 query: &TableQuery,
814 record: &UnifiedRecord,
815) -> bool {
816 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
817 return false;
818 }
819 let table_name = query.table.as_str();
820 let table_alias = query.alias.as_deref().unwrap_or(table_name);
821 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
822 super::join_filter::evaluate_runtime_filter_with_db(
823 Some(&runtime.inner.db),
824 record,
825 &filter,
826 Some(table_name),
827 Some(table_alias),
828 )
829 })
830}
831
832fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
833 match projection {
834 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
835 Some(column.as_str())
836 }
837 Projection::Column(column) => Some(column.as_str()),
838 Projection::Alias(column, _) => Some(column.as_str()),
839 _ => None,
840 }
841}
842
843fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
844 match projection {
845 Projection::Function(name, args) => {
846 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
847 Some((function, args.as_slice()))
848 }
849 _ => None,
850 }
851}
852
853fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
854 if args.len() != 1 {
855 return Err(RedDBError::Query(format!(
856 "{function}(...) expects exactly one string literal"
857 )));
858 }
859 match &args[0] {
860 Projection::Column(column) => column
861 .strip_prefix("LIT:")
862 .map(ToString::to_string)
863 .ok_or_else(|| {
864 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
865 }),
866 _ => Err(RedDBError::Query(format!(
867 "{function}(...) expects a string literal argument"
868 ))),
869 }
870}
871
872fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
873 match projection {
874 Projection::Field(_, Some(alias)) => alias.clone(),
875 Projection::Alias(_, alias) => alias.clone(),
876 Projection::Function(name, _) => name
877 .split_once(':')
878 .map(|(_, alias)| alias.to_string())
879 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
880 _ => numbered_probabilistic_label(base, index),
881 }
882}
883
884fn numbered_probabilistic_label(base: &str, index: usize) -> String {
885 if index == 0 {
886 base.to_string()
887 } else {
888 format!("{base}_{}", index + 1)
889 }
890}
891
892fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
893 match projection {
894 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
895 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
896 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
897 }
898}