1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11 lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15 lock.write()
16}
17
18fn probabilistic_collection_contract(
19 name: &str,
20 model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22 let now = crate::utils::now_unix_millis() as u128;
23 crate::physical::CollectionContract {
24 name: name.to_string(),
25 declared_model: model,
26 schema_mode: crate::catalog::SchemaMode::Dynamic,
27 origin: crate::physical::ContractOrigin::Explicit,
28 version: 1,
29 created_at_unix_ms: now,
30 updated_at_unix_ms: now,
31 default_ttl_ms: None,
32 vector_dimension: None,
33 vector_metric: None,
34 context_index_fields: Vec::new(),
35 declared_columns: Vec::new(),
36 table_def: None,
37 timestamps_enabled: false,
38 context_index_enabled: false,
39 metrics_raw_retention_ms: None,
40 metrics_rollup_policies: Vec::new(),
41 metrics_tenant_identity: None,
42 metrics_namespace: None,
43 append_only: false,
44 subscriptions: Vec::new(),
45 analytics_config: Vec::new(),
46 session_key: None,
47 session_gap_ms: None,
48 retention_duration_ms: None,
49 analytical_storage: None,
50 }
51}
52
53enum ProbabilisticReadProjection {
54 Cardinality { label: String },
55 Freq { element: String, label: String },
56 Contains { element: String, label: String },
57}
58
59impl RedDBRuntime {
60 pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
61 {
62 let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
63 let mut hlls =
64 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
65 for (name, data_hex) in entries {
66 let bytes = hex::decode(&data_hex).map_err(|err| {
67 RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
68 })?;
69 let Some(hll) =
70 crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
71 else {
72 return Err(RedDBError::Internal(format!(
73 "invalid persisted HLL state for '{name}'"
74 )));
75 };
76 hlls.insert(name, hll);
77 }
78 }
79
80 {
81 let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
82 let mut sketches = probabilistic_write(
83 &self.inner.probabilistic.sketches,
84 "probabilistic sketch store",
85 );
86 for (name, data_hex) in entries {
87 let bytes = hex::decode(&data_hex).map_err(|err| {
88 RedDBError::Internal(format!(
89 "invalid persisted SKETCH state for '{name}': {err}"
90 ))
91 })?;
92 let sketch =
93 crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
94 &bytes,
95 )
96 .ok_or_else(|| {
97 RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
98 })?;
99 sketches.insert(name, sketch);
100 }
101 }
102
103 {
104 let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
105 let mut filters = probabilistic_write(
106 &self.inner.probabilistic.filters,
107 "probabilistic filter store",
108 );
109 for (name, data_hex) in entries {
110 let bytes = hex::decode(&data_hex).map_err(|err| {
111 RedDBError::Internal(format!(
112 "invalid persisted FILTER state for '{name}': {err}"
113 ))
114 })?;
115 let filter =
116 crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
117 .ok_or_else(|| {
118 RedDBError::Internal(format!(
119 "invalid persisted FILTER state for '{name}'"
120 ))
121 })?;
122 filters.insert(name, filter);
123 }
124 }
125
126 Ok(())
127 }
128
129 fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
130 let Some(manager) = self.inner.db.store().get_collection("red_config") else {
131 return Vec::new();
132 };
133 let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
134 std::collections::HashMap::new();
135 for entity in manager.query_all(|_| true) {
136 let EntityData::Row(row) = &entity.data else {
137 continue;
138 };
139 let Some(named) = &row.named else {
140 continue;
141 };
142 let Some(Value::Text(key)) = named.get("key") else {
143 continue;
144 };
145 let Some(encoded_name) = key.strip_prefix(prefix) else {
146 continue;
147 };
148 let value = match named.get("value") {
149 Some(Value::Text(value)) => Some(value.to_string()),
150 Some(Value::Null) => None,
151 _ => continue,
152 };
153 let entity_id = entity.id.raw();
154 match latest.get(encoded_name) {
155 Some((existing_id, _)) if *existing_id > entity_id => {}
156 _ => {
157 latest.insert(encoded_name.to_string(), (entity_id, value));
158 }
159 }
160 }
161
162 latest
163 .into_iter()
164 .filter_map(|(encoded_name, (_, value))| {
165 let value = value?;
166 let bytes = hex::decode(encoded_name).ok()?;
167 let name = String::from_utf8(bytes).ok()?;
168 Some((name, value))
169 })
170 .collect()
171 }
172
173 fn persist_probabilistic_blob(
174 &self,
175 prefix: &str,
176 name: &str,
177 bytes: &[u8],
178 ) -> RedDBResult<()> {
179 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
180 self.inner
181 .db
182 .store()
183 .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
184 Ok(())
185 }
186
187 fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
188 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
189 self.inner
190 .db
191 .store()
192 .set_config_tree(&key, &crate::serde_json::Value::Null);
193 Ok(())
194 }
195
196 fn create_probabilistic_catalog_entry(
197 &self,
198 name: &str,
199 model: crate::catalog::CollectionModel,
200 ) -> RedDBResult<()> {
201 let store = self.inner.db.store();
202 store
203 .create_collection(name)
204 .map_err(|err| RedDBError::Internal(err.to_string()))?;
205 self.inner
206 .db
207 .save_collection_contract(probabilistic_collection_contract(name, model))
208 .map_err(|err| RedDBError::Internal(err.to_string()))?;
209 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
210 store.set_config_tree(
211 &format!("red.collection_tenants.{name}"),
212 &crate::serde_json::Value::String(tenant_id),
213 );
214 }
215 self.inner
216 .db
217 .persist_metadata()
218 .map_err(|err| RedDBError::Internal(err.to_string()))?;
219 self.invalidate_result_cache();
220 Ok(())
221 }
222
223 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
224 let store = self.inner.db.store();
225 if store.get_collection(name).is_some() {
226 store
227 .drop_collection(name)
228 .map_err(|err| RedDBError::Internal(err.to_string()))?;
229 }
230 self.inner
231 .db
232 .remove_collection_contract(name)
233 .map_err(|err| RedDBError::Internal(err.to_string()))?;
234 self.inner
235 .db
236 .persist_metadata()
237 .map_err(|err| RedDBError::Internal(err.to_string()))?;
238 self.invalidate_result_cache();
239 Ok(())
240 }
241
242 pub(crate) fn execute_probabilistic_select(
243 &self,
244 query: &TableQuery,
245 ) -> RedDBResult<Option<UnifiedResult>> {
246 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
247 let mut read_projections = Vec::new();
248 for projection in &projections {
249 if let Some(read_projection) =
250 parse_probabilistic_read_projection(projection, read_projections.len())?
251 {
252 read_projections.push(read_projection);
253 }
254 }
255
256 let Some(actual_model) = self
257 .inner
258 .db
259 .collection_contract(&query.table)
260 .map(|contract| contract.declared_model)
261 else {
262 return if read_projections.is_empty() {
263 Ok(None)
264 } else {
265 Err(RedDBError::NotFound(format!(
266 "probabilistic collection '{}' not found",
267 query.table
268 )))
269 };
270 };
271
272 let is_probabilistic_model = matches!(
273 actual_model,
274 crate::catalog::CollectionModel::Hll
275 | crate::catalog::CollectionModel::Sketch
276 | crate::catalog::CollectionModel::Filter
277 );
278 if read_projections.is_empty() {
279 return if is_probabilistic_model {
280 Err(RedDBError::Query(format!(
281 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
282 query.table
283 )))
284 } else {
285 Ok(None)
286 };
287 }
288
289 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
290 let (columns, record) =
291 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
292 let mut result = UnifiedResult::with_columns(columns);
293 if probabilistic_select_row_visible(self, query, &record) {
294 result.push(record);
295 }
296 Ok(Some(result))
297 }
298
299 pub fn execute_probabilistic_command(
300 &self,
301 raw_query: &str,
302 cmd: &ProbabilisticCommand,
303 ) -> RedDBResult<RuntimeQueryResult> {
304 let is_mutation = matches!(
308 cmd,
309 ProbabilisticCommand::CreateHll { .. }
310 | ProbabilisticCommand::HllAdd { .. }
311 | ProbabilisticCommand::HllMerge { .. }
312 | ProbabilisticCommand::DropHll { .. }
313 | ProbabilisticCommand::CreateSketch { .. }
314 | ProbabilisticCommand::SketchAdd { .. }
315 | ProbabilisticCommand::SketchMerge { .. }
316 | ProbabilisticCommand::DropSketch { .. }
317 | ProbabilisticCommand::CreateFilter { .. }
318 | ProbabilisticCommand::FilterAdd { .. }
319 | ProbabilisticCommand::FilterDelete { .. }
320 | ProbabilisticCommand::DropFilter { .. }
321 );
322 if is_mutation {
323 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
324 }
325 match cmd {
326 ProbabilisticCommand::CreateHll {
328 name,
329 precision,
330 if_not_exists,
331 } => {
332 let mut hlls =
333 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
334 if hlls.contains_key(name) {
335 if *if_not_exists {
336 return Ok(RuntimeQueryResult::ok_message(
337 raw_query.to_string(),
338 &format!("HLL '{}' already exists", name),
339 "create",
340 ));
341 }
342 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
343 }
344 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
345 *precision,
346 )
347 .ok_or_else(|| {
348 RedDBError::Query(format!(
349 "HLL precision must be between 4 and 18, got {precision}"
350 ))
351 })?;
352 self.create_probabilistic_catalog_entry(
353 name,
354 crate::catalog::CollectionModel::Hll,
355 )?;
356 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
357 hlls.insert(name.clone(), hll);
358 Ok(RuntimeQueryResult::ok_message(
359 raw_query.to_string(),
360 &format!("HLL '{}' created", name),
361 "create",
362 ))
363 }
364 ProbabilisticCommand::HllAdd { name, elements } => {
365 let mut hlls =
366 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
367 let hll = hlls
368 .get_mut(name)
369 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
370 for elem in elements {
371 hll.add(elem.as_bytes());
372 }
373 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
374 Ok(RuntimeQueryResult::ok_message(
375 raw_query.to_string(),
376 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
377 "insert",
378 ))
379 }
380 ProbabilisticCommand::HllCount { names } => {
381 let hlls =
382 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
383 if names.len() == 1 {
384 let hll = hlls.get(&names[0]).ok_or_else(|| {
385 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
386 })?;
387 let count = hll.count();
388 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
389 let mut record = UnifiedRecord::new();
390 record.set("count", Value::UnsignedInteger(count));
391 result.push(record);
392 Ok(RuntimeQueryResult {
393 query: raw_query.to_string(),
394 mode: QueryMode::Sql,
395 statement: "hll_count",
396 engine: "runtime-probabilistic",
397 result,
398 affected_rows: 0,
399 statement_type: "select",
400 bookmark: None,
401 })
402 } else {
403 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
405 for name in names {
406 let hll = hlls.get(name).ok_or_else(|| {
407 RedDBError::NotFound(format!("HLL '{}' not found", name))
408 })?;
409 merged.merge(hll);
410 }
411 let count = merged.count();
412 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
413 let mut record = UnifiedRecord::new();
414 record.set("count", Value::UnsignedInteger(count));
415 result.push(record);
416 Ok(RuntimeQueryResult {
417 query: raw_query.to_string(),
418 mode: QueryMode::Sql,
419 statement: "hll_count",
420 engine: "runtime-probabilistic",
421 result,
422 affected_rows: 0,
423 statement_type: "select",
424 bookmark: None,
425 })
426 }
427 }
428 ProbabilisticCommand::HllMerge { dest, sources } => {
429 let mut hlls =
430 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
431 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
432 for src in sources {
433 let hll = hlls
434 .get(src)
435 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
436 merged.merge(hll);
437 }
438 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
439 hlls.insert(dest.clone(), merged);
440 Ok(RuntimeQueryResult::ok_message(
441 raw_query.to_string(),
442 &format!(
443 "HLL '{}' created from merge of {}",
444 dest,
445 sources.join(", ")
446 ),
447 "create",
448 ))
449 }
450 ProbabilisticCommand::HllInfo { name } => {
451 let hlls =
452 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
453 let hll = hlls
454 .get(name)
455 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
456 let mut result = UnifiedResult::with_columns(vec![
457 "name".into(),
458 "precision".into(),
459 "count".into(),
460 "memory_bytes".into(),
461 ]);
462 let mut record = UnifiedRecord::new();
463 record.set("name", Value::text(name.clone()));
464 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
465 record.set("count", Value::UnsignedInteger(hll.count()));
466 record.set(
467 "memory_bytes",
468 Value::UnsignedInteger(hll.memory_bytes() as u64),
469 );
470 result.push(record);
471 Ok(RuntimeQueryResult {
472 query: raw_query.to_string(),
473 mode: QueryMode::Sql,
474 statement: "hll_info",
475 engine: "runtime-probabilistic",
476 result,
477 affected_rows: 0,
478 statement_type: "select",
479 bookmark: None,
480 })
481 }
482 ProbabilisticCommand::DropHll { name, if_exists } => {
483 let mut hlls =
484 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
485 if hlls.remove(name).is_none() {
486 if *if_exists {
487 return Ok(RuntimeQueryResult::ok_message(
488 raw_query.to_string(),
489 &format!("HLL '{}' does not exist", name),
490 "drop",
491 ));
492 }
493 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
494 }
495 self.drop_probabilistic_catalog_entry(name)?;
496 self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
497 Ok(RuntimeQueryResult::ok_message(
498 raw_query.to_string(),
499 &format!("HLL '{}' dropped", name),
500 "drop",
501 ))
502 }
503
504 ProbabilisticCommand::CreateSketch {
506 name,
507 width,
508 depth,
509 if_not_exists,
510 } => {
511 let mut sketches = probabilistic_write(
512 &self.inner.probabilistic.sketches,
513 "probabilistic sketch store",
514 );
515 if sketches.contains_key(name) {
516 if *if_not_exists {
517 return Ok(RuntimeQueryResult::ok_message(
518 raw_query.to_string(),
519 &format!("SKETCH '{}' already exists", name),
520 "create",
521 ));
522 }
523 return Err(RedDBError::Query(format!(
524 "SKETCH '{}' already exists",
525 name
526 )));
527 }
528 self.create_probabilistic_catalog_entry(
529 name,
530 crate::catalog::CollectionModel::Sketch,
531 )?;
532 let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
533 *width, *depth,
534 );
535 self.persist_probabilistic_blob(
536 PROB_SKETCH_STATE_PREFIX,
537 name,
538 &sketch.as_bytes(),
539 )?;
540 sketches.insert(name.clone(), sketch);
541 Ok(RuntimeQueryResult::ok_message(
542 raw_query.to_string(),
543 &format!(
544 "SKETCH '{}' created (width={}, depth={})",
545 name, width, depth
546 ),
547 "create",
548 ))
549 }
550 ProbabilisticCommand::SketchAdd {
551 name,
552 element,
553 count,
554 } => {
555 let mut sketches = probabilistic_write(
556 &self.inner.probabilistic.sketches,
557 "probabilistic sketch store",
558 );
559 let sketch = sketches
560 .get_mut(name)
561 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
562 sketch.add(element.as_bytes(), *count);
563 self.persist_probabilistic_blob(
564 PROB_SKETCH_STATE_PREFIX,
565 name,
566 &sketch.as_bytes(),
567 )?;
568 Ok(RuntimeQueryResult::ok_message(
569 raw_query.to_string(),
570 &format!("added {} to SKETCH '{}'", count, name),
571 "insert",
572 ))
573 }
574 ProbabilisticCommand::SketchCount { name, element } => {
575 let sketches = probabilistic_read(
576 &self.inner.probabilistic.sketches,
577 "probabilistic sketch store",
578 );
579 let sketch = sketches
580 .get(name)
581 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
582 let estimate = sketch.estimate(element.as_bytes());
583 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
584 let mut record = UnifiedRecord::new();
585 record.set("estimate", Value::UnsignedInteger(estimate));
586 result.push(record);
587 Ok(RuntimeQueryResult {
588 query: raw_query.to_string(),
589 mode: QueryMode::Sql,
590 statement: "sketch_count",
591 engine: "runtime-probabilistic",
592 result,
593 affected_rows: 0,
594 statement_type: "select",
595 bookmark: None,
596 })
597 }
598 ProbabilisticCommand::SketchMerge { dest, sources } => {
599 let mut sketches = probabilistic_write(
600 &self.inner.probabilistic.sketches,
601 "probabilistic sketch store",
602 );
603 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
604 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
605 })?;
606 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
607 first_src.width(),
608 first_src.depth(),
609 );
610 for src in sources {
611 let sketch = sketches.get(src).ok_or_else(|| {
612 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
613 })?;
614 if !merged.merge(sketch) {
615 return Err(RedDBError::Query(format!(
616 "SKETCH '{}' has incompatible dimensions",
617 src
618 )));
619 }
620 }
621 self.persist_probabilistic_blob(
622 PROB_SKETCH_STATE_PREFIX,
623 dest,
624 &merged.as_bytes(),
625 )?;
626 sketches.insert(dest.clone(), merged);
627 Ok(RuntimeQueryResult::ok_message(
628 raw_query.to_string(),
629 &format!(
630 "SKETCH '{}' created from merge of {}",
631 dest,
632 sources.join(", ")
633 ),
634 "create",
635 ))
636 }
637 ProbabilisticCommand::SketchInfo { name } => {
638 let sketches = probabilistic_read(
639 &self.inner.probabilistic.sketches,
640 "probabilistic sketch store",
641 );
642 let sketch = sketches
643 .get(name)
644 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
645 let mut result = UnifiedResult::with_columns(vec![
646 "name".into(),
647 "width".into(),
648 "depth".into(),
649 "total".into(),
650 "memory_bytes".into(),
651 ]);
652 let mut record = UnifiedRecord::new();
653 record.set("name", Value::text(name.clone()));
654 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
655 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
656 record.set("total", Value::UnsignedInteger(sketch.total()));
657 record.set(
658 "memory_bytes",
659 Value::UnsignedInteger(sketch.memory_bytes() as u64),
660 );
661 result.push(record);
662 Ok(RuntimeQueryResult {
663 query: raw_query.to_string(),
664 mode: QueryMode::Sql,
665 statement: "sketch_info",
666 engine: "runtime-probabilistic",
667 result,
668 affected_rows: 0,
669 statement_type: "select",
670 bookmark: None,
671 })
672 }
673 ProbabilisticCommand::DropSketch { name, if_exists } => {
674 let mut sketches = probabilistic_write(
675 &self.inner.probabilistic.sketches,
676 "probabilistic sketch store",
677 );
678 if sketches.remove(name).is_none() {
679 if *if_exists {
680 return Ok(RuntimeQueryResult::ok_message(
681 raw_query.to_string(),
682 &format!("SKETCH '{}' does not exist", name),
683 "drop",
684 ));
685 }
686 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
687 }
688 self.drop_probabilistic_catalog_entry(name)?;
689 self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
690 Ok(RuntimeQueryResult::ok_message(
691 raw_query.to_string(),
692 &format!("SKETCH '{}' dropped", name),
693 "drop",
694 ))
695 }
696
697 ProbabilisticCommand::CreateFilter {
699 name,
700 capacity,
701 if_not_exists,
702 } => {
703 let mut filters = probabilistic_write(
704 &self.inner.probabilistic.filters,
705 "probabilistic filter store",
706 );
707 if filters.contains_key(name) {
708 if *if_not_exists {
709 return Ok(RuntimeQueryResult::ok_message(
710 raw_query.to_string(),
711 &format!("FILTER '{}' already exists", name),
712 "create",
713 ));
714 }
715 return Err(RedDBError::Query(format!(
716 "FILTER '{}' already exists",
717 name
718 )));
719 }
720 self.create_probabilistic_catalog_entry(
721 name,
722 crate::catalog::CollectionModel::Filter,
723 )?;
724 let filter =
725 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
726 self.persist_probabilistic_blob(
727 PROB_FILTER_STATE_PREFIX,
728 name,
729 &filter.as_bytes(),
730 )?;
731 filters.insert(name.clone(), filter);
732 Ok(RuntimeQueryResult::ok_message(
733 raw_query.to_string(),
734 &format!("FILTER '{}' created (capacity={})", name, capacity),
735 "create",
736 ))
737 }
738 ProbabilisticCommand::FilterAdd { name, element } => {
739 let mut filters = probabilistic_write(
740 &self.inner.probabilistic.filters,
741 "probabilistic filter store",
742 );
743 let filter = filters
744 .get_mut(name)
745 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
746 if !filter.insert(element.as_bytes()) {
747 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
748 }
749 self.persist_probabilistic_blob(
750 PROB_FILTER_STATE_PREFIX,
751 name,
752 &filter.as_bytes(),
753 )?;
754 Ok(RuntimeQueryResult::ok_message(
755 raw_query.to_string(),
756 &format!("element added to FILTER '{}'", name),
757 "insert",
758 ))
759 }
760 ProbabilisticCommand::FilterCheck { name, element } => {
761 let filters = probabilistic_read(
762 &self.inner.probabilistic.filters,
763 "probabilistic filter store",
764 );
765 let filter = filters
766 .get(name)
767 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
768 let exists = filter.contains(element.as_bytes());
769 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
770 let mut record = UnifiedRecord::new();
771 record.set("exists", Value::Boolean(exists));
772 result.push(record);
773 Ok(RuntimeQueryResult {
774 query: raw_query.to_string(),
775 mode: QueryMode::Sql,
776 statement: "filter_check",
777 engine: "runtime-probabilistic",
778 result,
779 affected_rows: 0,
780 statement_type: "select",
781 bookmark: None,
782 })
783 }
784 ProbabilisticCommand::FilterDelete { name, element } => {
785 let mut filters = probabilistic_write(
786 &self.inner.probabilistic.filters,
787 "probabilistic filter store",
788 );
789 let filter = filters
790 .get_mut(name)
791 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
792 let removed = filter.delete(element.as_bytes());
793 self.persist_probabilistic_blob(
794 PROB_FILTER_STATE_PREFIX,
795 name,
796 &filter.as_bytes(),
797 )?;
798 Ok(RuntimeQueryResult::ok_message(
799 raw_query.to_string(),
800 &format!(
801 "element {} from FILTER '{}'",
802 if removed { "deleted" } else { "not found in" },
803 name
804 ),
805 "delete",
806 ))
807 }
808 ProbabilisticCommand::FilterCount { name } => {
809 let filters = probabilistic_read(
810 &self.inner.probabilistic.filters,
811 "probabilistic filter store",
812 );
813 let filter = filters
814 .get(name)
815 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
816 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
817 let mut record = UnifiedRecord::new();
818 record.set("count", Value::UnsignedInteger(filter.count() as u64));
819 result.push(record);
820 Ok(RuntimeQueryResult {
821 query: raw_query.to_string(),
822 mode: QueryMode::Sql,
823 statement: "filter_count",
824 engine: "runtime-probabilistic",
825 result,
826 affected_rows: 0,
827 statement_type: "select",
828 bookmark: None,
829 })
830 }
831 ProbabilisticCommand::FilterInfo { name } => {
832 let filters = probabilistic_read(
833 &self.inner.probabilistic.filters,
834 "probabilistic filter store",
835 );
836 let filter = filters
837 .get(name)
838 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
839 let mut result = UnifiedResult::with_columns(vec![
840 "name".into(),
841 "count".into(),
842 "load_factor".into(),
843 "memory_bytes".into(),
844 ]);
845 let mut record = UnifiedRecord::new();
846 record.set("name", Value::text(name.clone()));
847 record.set("count", Value::UnsignedInteger(filter.count() as u64));
848 record.set("load_factor", Value::Float(filter.load_factor()));
849 record.set(
850 "memory_bytes",
851 Value::UnsignedInteger(filter.memory_bytes() as u64),
852 );
853 result.push(record);
854 Ok(RuntimeQueryResult {
855 query: raw_query.to_string(),
856 mode: QueryMode::Sql,
857 statement: "filter_info",
858 engine: "runtime-probabilistic",
859 result,
860 affected_rows: 0,
861 statement_type: "select",
862 bookmark: None,
863 })
864 }
865 ProbabilisticCommand::DropFilter { name, if_exists } => {
866 let mut filters = probabilistic_write(
867 &self.inner.probabilistic.filters,
868 "probabilistic filter store",
869 );
870 if filters.remove(name).is_none() {
871 if *if_exists {
872 return Ok(RuntimeQueryResult::ok_message(
873 raw_query.to_string(),
874 &format!("FILTER '{}' does not exist", name),
875 "drop",
876 ));
877 }
878 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
879 }
880 self.drop_probabilistic_catalog_entry(name)?;
881 self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
882 Ok(RuntimeQueryResult::ok_message(
883 raw_query.to_string(),
884 &format!("FILTER '{}' dropped", name),
885 "drop",
886 ))
887 }
888 }
889 }
890}
891
892fn parse_probabilistic_read_projection(
893 projection: &Projection,
894 index: usize,
895) -> RedDBResult<Option<ProbabilisticReadProjection>> {
896 if let Some(column) = projection_unqualified_column(projection) {
897 if column.eq_ignore_ascii_case("CARDINALITY") {
898 return Ok(Some(ProbabilisticReadProjection::Cardinality {
899 label: probabilistic_projection_label(projection, "cardinality", index),
900 }));
901 }
902 }
903
904 let Some((function, args)) = projection_function(projection) else {
905 return Ok(None);
906 };
907 if function.eq_ignore_ascii_case("FREQ") {
908 let element = projection_single_text_arg(function, args)?;
909 return Ok(Some(ProbabilisticReadProjection::Freq {
910 element,
911 label: probabilistic_projection_label(projection, "freq", index),
912 }));
913 }
914 if function.eq_ignore_ascii_case("CONTAINS") {
915 let element = projection_single_text_arg(function, args)?;
916 return Ok(Some(ProbabilisticReadProjection::Contains {
917 element,
918 label: probabilistic_projection_label(projection, "contains", index),
919 }));
920 }
921
922 Ok(None)
923}
924
925fn validate_probabilistic_read_model(
926 collection: &str,
927 actual_model: crate::catalog::CollectionModel,
928 projections: &[ProbabilisticReadProjection],
929) -> RedDBResult<()> {
930 for projection in projections {
931 let expected_model = match projection {
932 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
933 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
934 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
935 };
936 if actual_model != expected_model {
937 return Err(RedDBError::Query(format!(
938 "{} is only supported for {} collections; '{}' is {}",
939 probabilistic_projection_form(projection),
940 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
941 collection,
942 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
943 )));
944 }
945 }
946 Ok(())
947}
948
949impl RedDBRuntime {
950 fn materialize_probabilistic_select_row(
951 &self,
952 collection: &str,
953 projections: &[ProbabilisticReadProjection],
954 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
955 let mut columns = Vec::with_capacity(projections.len());
956 let mut record = UnifiedRecord::new();
957 for projection in projections {
958 match projection {
959 ProbabilisticReadProjection::Cardinality { label } => {
960 let hlls = probabilistic_read(
961 &self.inner.probabilistic.hlls,
962 "probabilistic HLL store",
963 );
964 let hll = hlls.get(collection).ok_or_else(|| {
965 RedDBError::NotFound(format!("HLL '{}' not found", collection))
966 })?;
967 columns.push(label.clone());
968 record.set(label, Value::UnsignedInteger(hll.count()));
969 }
970 ProbabilisticReadProjection::Freq { element, label } => {
971 let sketches = probabilistic_read(
972 &self.inner.probabilistic.sketches,
973 "probabilistic sketch store",
974 );
975 let sketch = sketches.get(collection).ok_or_else(|| {
976 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
977 })?;
978 columns.push(label.clone());
979 record.set(
980 label,
981 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
982 );
983 }
984 ProbabilisticReadProjection::Contains { element, label } => {
985 let filters = probabilistic_read(
986 &self.inner.probabilistic.filters,
987 "probabilistic filter store",
988 );
989 let filter = filters.get(collection).ok_or_else(|| {
990 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
991 })?;
992 columns.push(label.clone());
993 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
994 }
995 }
996 }
997 Ok((columns, record))
998 }
999}
1000
1001fn probabilistic_select_row_visible(
1002 runtime: &RedDBRuntime,
1003 query: &TableQuery,
1004 record: &UnifiedRecord,
1005) -> bool {
1006 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1007 return false;
1008 }
1009 let table_name = query.table.as_str();
1010 let table_alias = query.alias.as_deref().unwrap_or(table_name);
1011 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1012 super::join_filter::evaluate_runtime_filter_with_db(
1013 Some(&runtime.inner.db),
1014 record,
1015 &filter,
1016 Some(table_name),
1017 Some(table_alias),
1018 )
1019 })
1020}
1021
1022fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1023 match projection {
1024 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1025 Some(column.as_str())
1026 }
1027 Projection::Column(column) => Some(column.as_str()),
1028 Projection::Alias(column, _) => Some(column.as_str()),
1029 _ => None,
1030 }
1031}
1032
1033fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1034 match projection {
1035 Projection::Function(name, args) => {
1036 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1037 Some((function, args.as_slice()))
1038 }
1039 _ => None,
1040 }
1041}
1042
1043fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1044 if args.len() != 1 {
1045 return Err(RedDBError::Query(format!(
1046 "{function}(...) expects exactly one string literal"
1047 )));
1048 }
1049 match &args[0] {
1050 Projection::Column(column) => column
1051 .strip_prefix("LIT:")
1052 .map(ToString::to_string)
1053 .ok_or_else(|| {
1054 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1055 }),
1056 _ => Err(RedDBError::Query(format!(
1057 "{function}(...) expects a string literal argument"
1058 ))),
1059 }
1060}
1061
1062fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1063 match projection {
1064 Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1065 if alias.eq_ignore_ascii_case(column) =>
1066 {
1067 numbered_probabilistic_label(base, index)
1068 }
1069 Projection::Field(_, Some(alias)) => alias.clone(),
1070 Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1071 numbered_probabilistic_label(base, index)
1072 }
1073 Projection::Alias(_, alias) => alias.clone(),
1074 Projection::Function(name, _) => name
1075 .split_once(':')
1076 .map(|(_, alias)| {
1077 if is_generated_probabilistic_function_label(alias, base) {
1078 numbered_probabilistic_label(base, index)
1079 } else {
1080 alias.to_string()
1081 }
1082 })
1083 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1084 _ => numbered_probabilistic_label(base, index),
1085 }
1086}
1087
1088fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1089 alias
1090 .get(..base.len())
1091 .is_some_and(|head| head.eq_ignore_ascii_case(base))
1092 && alias[base.len()..].starts_with('(')
1093}
1094
1095fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1096 if index == 0 {
1097 base.to_string()
1098 } else {
1099 format!("{base}_{}", index + 1)
1100 }
1101}
1102
1103fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1104 match projection {
1105 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1106 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1107 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1108 }
1109}