1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11 lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15 lock.write()
16}
17
18fn probabilistic_collection_contract(
19 name: &str,
20 model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22 let now = crate::utils::now_unix_millis() as u128;
23 crate::physical::CollectionContract {
24 name: name.to_string(),
25 declared_model: model,
26 schema_mode: crate::catalog::SchemaMode::Dynamic,
27 origin: crate::physical::ContractOrigin::Explicit,
28 version: 1,
29 created_at_unix_ms: now,
30 updated_at_unix_ms: now,
31 default_ttl_ms: None,
32 vector_dimension: None,
33 vector_metric: None,
34 context_index_fields: Vec::new(),
35 declared_columns: Vec::new(),
36 table_def: None,
37 timestamps_enabled: false,
38 context_index_enabled: false,
39 metrics_raw_retention_ms: None,
40 metrics_rollup_policies: Vec::new(),
41 metrics_tenant_identity: None,
42 metrics_namespace: None,
43 append_only: false,
44 subscriptions: Vec::new(),
45 analytics_config: Vec::new(),
46 session_key: None,
47 session_gap_ms: None,
48 retention_duration_ms: None,
49 }
50}
51
52enum ProbabilisticReadProjection {
53 Cardinality { label: String },
54 Freq { element: String, label: String },
55 Contains { element: String, label: String },
56}
57
58impl RedDBRuntime {
59 pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
60 {
61 let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
62 let mut hlls =
63 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
64 for (name, data_hex) in entries {
65 let bytes = hex::decode(&data_hex).map_err(|err| {
66 RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
67 })?;
68 let Some(hll) =
69 crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
70 else {
71 return Err(RedDBError::Internal(format!(
72 "invalid persisted HLL state for '{name}'"
73 )));
74 };
75 hlls.insert(name, hll);
76 }
77 }
78
79 {
80 let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
81 let mut sketches = probabilistic_write(
82 &self.inner.probabilistic.sketches,
83 "probabilistic sketch store",
84 );
85 for (name, data_hex) in entries {
86 let bytes = hex::decode(&data_hex).map_err(|err| {
87 RedDBError::Internal(format!(
88 "invalid persisted SKETCH state for '{name}': {err}"
89 ))
90 })?;
91 let sketch =
92 crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
93 &bytes,
94 )
95 .ok_or_else(|| {
96 RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
97 })?;
98 sketches.insert(name, sketch);
99 }
100 }
101
102 {
103 let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
104 let mut filters = probabilistic_write(
105 &self.inner.probabilistic.filters,
106 "probabilistic filter store",
107 );
108 for (name, data_hex) in entries {
109 let bytes = hex::decode(&data_hex).map_err(|err| {
110 RedDBError::Internal(format!(
111 "invalid persisted FILTER state for '{name}': {err}"
112 ))
113 })?;
114 let filter =
115 crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
116 .ok_or_else(|| {
117 RedDBError::Internal(format!(
118 "invalid persisted FILTER state for '{name}'"
119 ))
120 })?;
121 filters.insert(name, filter);
122 }
123 }
124
125 Ok(())
126 }
127
128 fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
129 let Some(manager) = self.inner.db.store().get_collection("red_config") else {
130 return Vec::new();
131 };
132 let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
133 std::collections::HashMap::new();
134 for entity in manager.query_all(|_| true) {
135 let EntityData::Row(row) = &entity.data else {
136 continue;
137 };
138 let Some(named) = &row.named else {
139 continue;
140 };
141 let Some(Value::Text(key)) = named.get("key") else {
142 continue;
143 };
144 let Some(encoded_name) = key.strip_prefix(prefix) else {
145 continue;
146 };
147 let value = match named.get("value") {
148 Some(Value::Text(value)) => Some(value.to_string()),
149 Some(Value::Null) => None,
150 _ => continue,
151 };
152 let entity_id = entity.id.raw();
153 match latest.get(encoded_name) {
154 Some((existing_id, _)) if *existing_id > entity_id => {}
155 _ => {
156 latest.insert(encoded_name.to_string(), (entity_id, value));
157 }
158 }
159 }
160
161 latest
162 .into_iter()
163 .filter_map(|(encoded_name, (_, value))| {
164 let value = value?;
165 let bytes = hex::decode(encoded_name).ok()?;
166 let name = String::from_utf8(bytes).ok()?;
167 Some((name, value))
168 })
169 .collect()
170 }
171
172 fn persist_probabilistic_blob(
173 &self,
174 prefix: &str,
175 name: &str,
176 bytes: &[u8],
177 ) -> RedDBResult<()> {
178 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
179 self.inner
180 .db
181 .store()
182 .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
183 Ok(())
184 }
185
186 fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
187 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
188 self.inner
189 .db
190 .store()
191 .set_config_tree(&key, &crate::serde_json::Value::Null);
192 Ok(())
193 }
194
195 fn create_probabilistic_catalog_entry(
196 &self,
197 name: &str,
198 model: crate::catalog::CollectionModel,
199 ) -> RedDBResult<()> {
200 let store = self.inner.db.store();
201 store
202 .create_collection(name)
203 .map_err(|err| RedDBError::Internal(err.to_string()))?;
204 self.inner
205 .db
206 .save_collection_contract(probabilistic_collection_contract(name, model))
207 .map_err(|err| RedDBError::Internal(err.to_string()))?;
208 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
209 store.set_config_tree(
210 &format!("red.collection_tenants.{name}"),
211 &crate::serde_json::Value::String(tenant_id),
212 );
213 }
214 self.inner
215 .db
216 .persist_metadata()
217 .map_err(|err| RedDBError::Internal(err.to_string()))?;
218 self.invalidate_result_cache();
219 Ok(())
220 }
221
222 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
223 let store = self.inner.db.store();
224 if store.get_collection(name).is_some() {
225 store
226 .drop_collection(name)
227 .map_err(|err| RedDBError::Internal(err.to_string()))?;
228 }
229 self.inner
230 .db
231 .remove_collection_contract(name)
232 .map_err(|err| RedDBError::Internal(err.to_string()))?;
233 self.inner
234 .db
235 .persist_metadata()
236 .map_err(|err| RedDBError::Internal(err.to_string()))?;
237 self.invalidate_result_cache();
238 Ok(())
239 }
240
241 pub(crate) fn execute_probabilistic_select(
242 &self,
243 query: &TableQuery,
244 ) -> RedDBResult<Option<UnifiedResult>> {
245 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
246 let mut read_projections = Vec::new();
247 for projection in &projections {
248 if let Some(read_projection) =
249 parse_probabilistic_read_projection(projection, read_projections.len())?
250 {
251 read_projections.push(read_projection);
252 }
253 }
254
255 let Some(actual_model) = self
256 .inner
257 .db
258 .collection_contract(&query.table)
259 .map(|contract| contract.declared_model)
260 else {
261 return if read_projections.is_empty() {
262 Ok(None)
263 } else {
264 Err(RedDBError::NotFound(format!(
265 "probabilistic collection '{}' not found",
266 query.table
267 )))
268 };
269 };
270
271 let is_probabilistic_model = matches!(
272 actual_model,
273 crate::catalog::CollectionModel::Hll
274 | crate::catalog::CollectionModel::Sketch
275 | crate::catalog::CollectionModel::Filter
276 );
277 if read_projections.is_empty() {
278 return if is_probabilistic_model {
279 Err(RedDBError::Query(format!(
280 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
281 query.table
282 )))
283 } else {
284 Ok(None)
285 };
286 }
287
288 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
289 let (columns, record) =
290 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
291 let mut result = UnifiedResult::with_columns(columns);
292 if probabilistic_select_row_visible(self, query, &record) {
293 result.push(record);
294 }
295 Ok(Some(result))
296 }
297
298 pub fn execute_probabilistic_command(
299 &self,
300 raw_query: &str,
301 cmd: &ProbabilisticCommand,
302 ) -> RedDBResult<RuntimeQueryResult> {
303 let is_mutation = matches!(
307 cmd,
308 ProbabilisticCommand::CreateHll { .. }
309 | ProbabilisticCommand::HllAdd { .. }
310 | ProbabilisticCommand::HllMerge { .. }
311 | ProbabilisticCommand::DropHll { .. }
312 | ProbabilisticCommand::CreateSketch { .. }
313 | ProbabilisticCommand::SketchAdd { .. }
314 | ProbabilisticCommand::SketchMerge { .. }
315 | ProbabilisticCommand::DropSketch { .. }
316 | ProbabilisticCommand::CreateFilter { .. }
317 | ProbabilisticCommand::FilterAdd { .. }
318 | ProbabilisticCommand::FilterDelete { .. }
319 | ProbabilisticCommand::DropFilter { .. }
320 );
321 if is_mutation {
322 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
323 }
324 match cmd {
325 ProbabilisticCommand::CreateHll {
327 name,
328 precision,
329 if_not_exists,
330 } => {
331 let mut hlls =
332 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
333 if hlls.contains_key(name) {
334 if *if_not_exists {
335 return Ok(RuntimeQueryResult::ok_message(
336 raw_query.to_string(),
337 &format!("HLL '{}' already exists", name),
338 "create",
339 ));
340 }
341 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
342 }
343 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
344 *precision,
345 )
346 .ok_or_else(|| {
347 RedDBError::Query(format!(
348 "HLL precision must be between 4 and 18, got {precision}"
349 ))
350 })?;
351 self.create_probabilistic_catalog_entry(
352 name,
353 crate::catalog::CollectionModel::Hll,
354 )?;
355 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
356 hlls.insert(name.clone(), hll);
357 Ok(RuntimeQueryResult::ok_message(
358 raw_query.to_string(),
359 &format!("HLL '{}' created", name),
360 "create",
361 ))
362 }
363 ProbabilisticCommand::HllAdd { name, elements } => {
364 let mut hlls =
365 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
366 let hll = hlls
367 .get_mut(name)
368 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
369 for elem in elements {
370 hll.add(elem.as_bytes());
371 }
372 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
373 Ok(RuntimeQueryResult::ok_message(
374 raw_query.to_string(),
375 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
376 "insert",
377 ))
378 }
379 ProbabilisticCommand::HllCount { names } => {
380 let hlls =
381 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
382 if names.len() == 1 {
383 let hll = hlls.get(&names[0]).ok_or_else(|| {
384 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
385 })?;
386 let count = hll.count();
387 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
388 let mut record = UnifiedRecord::new();
389 record.set("count", Value::UnsignedInteger(count));
390 result.push(record);
391 Ok(RuntimeQueryResult {
392 query: raw_query.to_string(),
393 mode: QueryMode::Sql,
394 statement: "hll_count",
395 engine: "runtime-probabilistic",
396 result,
397 affected_rows: 0,
398 statement_type: "select",
399 bookmark: None,
400 })
401 } else {
402 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
404 for name in names {
405 let hll = hlls.get(name).ok_or_else(|| {
406 RedDBError::NotFound(format!("HLL '{}' not found", name))
407 })?;
408 merged.merge(hll);
409 }
410 let count = merged.count();
411 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
412 let mut record = UnifiedRecord::new();
413 record.set("count", Value::UnsignedInteger(count));
414 result.push(record);
415 Ok(RuntimeQueryResult {
416 query: raw_query.to_string(),
417 mode: QueryMode::Sql,
418 statement: "hll_count",
419 engine: "runtime-probabilistic",
420 result,
421 affected_rows: 0,
422 statement_type: "select",
423 bookmark: None,
424 })
425 }
426 }
427 ProbabilisticCommand::HllMerge { dest, sources } => {
428 let mut hlls =
429 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
430 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
431 for src in sources {
432 let hll = hlls
433 .get(src)
434 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
435 merged.merge(hll);
436 }
437 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
438 hlls.insert(dest.clone(), merged);
439 Ok(RuntimeQueryResult::ok_message(
440 raw_query.to_string(),
441 &format!(
442 "HLL '{}' created from merge of {}",
443 dest,
444 sources.join(", ")
445 ),
446 "create",
447 ))
448 }
449 ProbabilisticCommand::HllInfo { name } => {
450 let hlls =
451 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
452 let hll = hlls
453 .get(name)
454 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
455 let mut result = UnifiedResult::with_columns(vec![
456 "name".into(),
457 "precision".into(),
458 "count".into(),
459 "memory_bytes".into(),
460 ]);
461 let mut record = UnifiedRecord::new();
462 record.set("name", Value::text(name.clone()));
463 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
464 record.set("count", Value::UnsignedInteger(hll.count()));
465 record.set(
466 "memory_bytes",
467 Value::UnsignedInteger(hll.memory_bytes() as u64),
468 );
469 result.push(record);
470 Ok(RuntimeQueryResult {
471 query: raw_query.to_string(),
472 mode: QueryMode::Sql,
473 statement: "hll_info",
474 engine: "runtime-probabilistic",
475 result,
476 affected_rows: 0,
477 statement_type: "select",
478 bookmark: None,
479 })
480 }
481 ProbabilisticCommand::DropHll { name, if_exists } => {
482 let mut hlls =
483 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
484 if hlls.remove(name).is_none() {
485 if *if_exists {
486 return Ok(RuntimeQueryResult::ok_message(
487 raw_query.to_string(),
488 &format!("HLL '{}' does not exist", name),
489 "drop",
490 ));
491 }
492 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
493 }
494 self.drop_probabilistic_catalog_entry(name)?;
495 self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
496 Ok(RuntimeQueryResult::ok_message(
497 raw_query.to_string(),
498 &format!("HLL '{}' dropped", name),
499 "drop",
500 ))
501 }
502
503 ProbabilisticCommand::CreateSketch {
505 name,
506 width,
507 depth,
508 if_not_exists,
509 } => {
510 let mut sketches = probabilistic_write(
511 &self.inner.probabilistic.sketches,
512 "probabilistic sketch store",
513 );
514 if sketches.contains_key(name) {
515 if *if_not_exists {
516 return Ok(RuntimeQueryResult::ok_message(
517 raw_query.to_string(),
518 &format!("SKETCH '{}' already exists", name),
519 "create",
520 ));
521 }
522 return Err(RedDBError::Query(format!(
523 "SKETCH '{}' already exists",
524 name
525 )));
526 }
527 self.create_probabilistic_catalog_entry(
528 name,
529 crate::catalog::CollectionModel::Sketch,
530 )?;
531 let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
532 *width, *depth,
533 );
534 self.persist_probabilistic_blob(
535 PROB_SKETCH_STATE_PREFIX,
536 name,
537 &sketch.as_bytes(),
538 )?;
539 sketches.insert(name.clone(), sketch);
540 Ok(RuntimeQueryResult::ok_message(
541 raw_query.to_string(),
542 &format!(
543 "SKETCH '{}' created (width={}, depth={})",
544 name, width, depth
545 ),
546 "create",
547 ))
548 }
549 ProbabilisticCommand::SketchAdd {
550 name,
551 element,
552 count,
553 } => {
554 let mut sketches = probabilistic_write(
555 &self.inner.probabilistic.sketches,
556 "probabilistic sketch store",
557 );
558 let sketch = sketches
559 .get_mut(name)
560 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
561 sketch.add(element.as_bytes(), *count);
562 self.persist_probabilistic_blob(
563 PROB_SKETCH_STATE_PREFIX,
564 name,
565 &sketch.as_bytes(),
566 )?;
567 Ok(RuntimeQueryResult::ok_message(
568 raw_query.to_string(),
569 &format!("added {} to SKETCH '{}'", count, name),
570 "insert",
571 ))
572 }
573 ProbabilisticCommand::SketchCount { name, element } => {
574 let sketches = probabilistic_read(
575 &self.inner.probabilistic.sketches,
576 "probabilistic sketch store",
577 );
578 let sketch = sketches
579 .get(name)
580 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
581 let estimate = sketch.estimate(element.as_bytes());
582 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
583 let mut record = UnifiedRecord::new();
584 record.set("estimate", Value::UnsignedInteger(estimate));
585 result.push(record);
586 Ok(RuntimeQueryResult {
587 query: raw_query.to_string(),
588 mode: QueryMode::Sql,
589 statement: "sketch_count",
590 engine: "runtime-probabilistic",
591 result,
592 affected_rows: 0,
593 statement_type: "select",
594 bookmark: None,
595 })
596 }
597 ProbabilisticCommand::SketchMerge { dest, sources } => {
598 let mut sketches = probabilistic_write(
599 &self.inner.probabilistic.sketches,
600 "probabilistic sketch store",
601 );
602 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
603 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
604 })?;
605 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
606 first_src.width(),
607 first_src.depth(),
608 );
609 for src in sources {
610 let sketch = sketches.get(src).ok_or_else(|| {
611 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
612 })?;
613 if !merged.merge(sketch) {
614 return Err(RedDBError::Query(format!(
615 "SKETCH '{}' has incompatible dimensions",
616 src
617 )));
618 }
619 }
620 self.persist_probabilistic_blob(
621 PROB_SKETCH_STATE_PREFIX,
622 dest,
623 &merged.as_bytes(),
624 )?;
625 sketches.insert(dest.clone(), merged);
626 Ok(RuntimeQueryResult::ok_message(
627 raw_query.to_string(),
628 &format!(
629 "SKETCH '{}' created from merge of {}",
630 dest,
631 sources.join(", ")
632 ),
633 "create",
634 ))
635 }
636 ProbabilisticCommand::SketchInfo { name } => {
637 let sketches = probabilistic_read(
638 &self.inner.probabilistic.sketches,
639 "probabilistic sketch store",
640 );
641 let sketch = sketches
642 .get(name)
643 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
644 let mut result = UnifiedResult::with_columns(vec![
645 "name".into(),
646 "width".into(),
647 "depth".into(),
648 "total".into(),
649 "memory_bytes".into(),
650 ]);
651 let mut record = UnifiedRecord::new();
652 record.set("name", Value::text(name.clone()));
653 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
654 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
655 record.set("total", Value::UnsignedInteger(sketch.total()));
656 record.set(
657 "memory_bytes",
658 Value::UnsignedInteger(sketch.memory_bytes() as u64),
659 );
660 result.push(record);
661 Ok(RuntimeQueryResult {
662 query: raw_query.to_string(),
663 mode: QueryMode::Sql,
664 statement: "sketch_info",
665 engine: "runtime-probabilistic",
666 result,
667 affected_rows: 0,
668 statement_type: "select",
669 bookmark: None,
670 })
671 }
672 ProbabilisticCommand::DropSketch { name, if_exists } => {
673 let mut sketches = probabilistic_write(
674 &self.inner.probabilistic.sketches,
675 "probabilistic sketch store",
676 );
677 if sketches.remove(name).is_none() {
678 if *if_exists {
679 return Ok(RuntimeQueryResult::ok_message(
680 raw_query.to_string(),
681 &format!("SKETCH '{}' does not exist", name),
682 "drop",
683 ));
684 }
685 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
686 }
687 self.drop_probabilistic_catalog_entry(name)?;
688 self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
689 Ok(RuntimeQueryResult::ok_message(
690 raw_query.to_string(),
691 &format!("SKETCH '{}' dropped", name),
692 "drop",
693 ))
694 }
695
696 ProbabilisticCommand::CreateFilter {
698 name,
699 capacity,
700 if_not_exists,
701 } => {
702 let mut filters = probabilistic_write(
703 &self.inner.probabilistic.filters,
704 "probabilistic filter store",
705 );
706 if filters.contains_key(name) {
707 if *if_not_exists {
708 return Ok(RuntimeQueryResult::ok_message(
709 raw_query.to_string(),
710 &format!("FILTER '{}' already exists", name),
711 "create",
712 ));
713 }
714 return Err(RedDBError::Query(format!(
715 "FILTER '{}' already exists",
716 name
717 )));
718 }
719 self.create_probabilistic_catalog_entry(
720 name,
721 crate::catalog::CollectionModel::Filter,
722 )?;
723 let filter =
724 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
725 self.persist_probabilistic_blob(
726 PROB_FILTER_STATE_PREFIX,
727 name,
728 &filter.as_bytes(),
729 )?;
730 filters.insert(name.clone(), filter);
731 Ok(RuntimeQueryResult::ok_message(
732 raw_query.to_string(),
733 &format!("FILTER '{}' created (capacity={})", name, capacity),
734 "create",
735 ))
736 }
737 ProbabilisticCommand::FilterAdd { name, element } => {
738 let mut filters = probabilistic_write(
739 &self.inner.probabilistic.filters,
740 "probabilistic filter store",
741 );
742 let filter = filters
743 .get_mut(name)
744 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
745 if !filter.insert(element.as_bytes()) {
746 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
747 }
748 self.persist_probabilistic_blob(
749 PROB_FILTER_STATE_PREFIX,
750 name,
751 &filter.as_bytes(),
752 )?;
753 Ok(RuntimeQueryResult::ok_message(
754 raw_query.to_string(),
755 &format!("element added to FILTER '{}'", name),
756 "insert",
757 ))
758 }
759 ProbabilisticCommand::FilterCheck { name, element } => {
760 let filters = probabilistic_read(
761 &self.inner.probabilistic.filters,
762 "probabilistic filter store",
763 );
764 let filter = filters
765 .get(name)
766 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
767 let exists = filter.contains(element.as_bytes());
768 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
769 let mut record = UnifiedRecord::new();
770 record.set("exists", Value::Boolean(exists));
771 result.push(record);
772 Ok(RuntimeQueryResult {
773 query: raw_query.to_string(),
774 mode: QueryMode::Sql,
775 statement: "filter_check",
776 engine: "runtime-probabilistic",
777 result,
778 affected_rows: 0,
779 statement_type: "select",
780 bookmark: None,
781 })
782 }
783 ProbabilisticCommand::FilterDelete { name, element } => {
784 let mut filters = probabilistic_write(
785 &self.inner.probabilistic.filters,
786 "probabilistic filter store",
787 );
788 let filter = filters
789 .get_mut(name)
790 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
791 let removed = filter.delete(element.as_bytes());
792 self.persist_probabilistic_blob(
793 PROB_FILTER_STATE_PREFIX,
794 name,
795 &filter.as_bytes(),
796 )?;
797 Ok(RuntimeQueryResult::ok_message(
798 raw_query.to_string(),
799 &format!(
800 "element {} from FILTER '{}'",
801 if removed { "deleted" } else { "not found in" },
802 name
803 ),
804 "delete",
805 ))
806 }
807 ProbabilisticCommand::FilterCount { name } => {
808 let filters = probabilistic_read(
809 &self.inner.probabilistic.filters,
810 "probabilistic filter store",
811 );
812 let filter = filters
813 .get(name)
814 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
815 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
816 let mut record = UnifiedRecord::new();
817 record.set("count", Value::UnsignedInteger(filter.count() as u64));
818 result.push(record);
819 Ok(RuntimeQueryResult {
820 query: raw_query.to_string(),
821 mode: QueryMode::Sql,
822 statement: "filter_count",
823 engine: "runtime-probabilistic",
824 result,
825 affected_rows: 0,
826 statement_type: "select",
827 bookmark: None,
828 })
829 }
830 ProbabilisticCommand::FilterInfo { name } => {
831 let filters = probabilistic_read(
832 &self.inner.probabilistic.filters,
833 "probabilistic filter store",
834 );
835 let filter = filters
836 .get(name)
837 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
838 let mut result = UnifiedResult::with_columns(vec![
839 "name".into(),
840 "count".into(),
841 "load_factor".into(),
842 "memory_bytes".into(),
843 ]);
844 let mut record = UnifiedRecord::new();
845 record.set("name", Value::text(name.clone()));
846 record.set("count", Value::UnsignedInteger(filter.count() as u64));
847 record.set("load_factor", Value::Float(filter.load_factor()));
848 record.set(
849 "memory_bytes",
850 Value::UnsignedInteger(filter.memory_bytes() as u64),
851 );
852 result.push(record);
853 Ok(RuntimeQueryResult {
854 query: raw_query.to_string(),
855 mode: QueryMode::Sql,
856 statement: "filter_info",
857 engine: "runtime-probabilistic",
858 result,
859 affected_rows: 0,
860 statement_type: "select",
861 bookmark: None,
862 })
863 }
864 ProbabilisticCommand::DropFilter { name, if_exists } => {
865 let mut filters = probabilistic_write(
866 &self.inner.probabilistic.filters,
867 "probabilistic filter store",
868 );
869 if filters.remove(name).is_none() {
870 if *if_exists {
871 return Ok(RuntimeQueryResult::ok_message(
872 raw_query.to_string(),
873 &format!("FILTER '{}' does not exist", name),
874 "drop",
875 ));
876 }
877 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
878 }
879 self.drop_probabilistic_catalog_entry(name)?;
880 self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
881 Ok(RuntimeQueryResult::ok_message(
882 raw_query.to_string(),
883 &format!("FILTER '{}' dropped", name),
884 "drop",
885 ))
886 }
887 }
888 }
889}
890
891fn parse_probabilistic_read_projection(
892 projection: &Projection,
893 index: usize,
894) -> RedDBResult<Option<ProbabilisticReadProjection>> {
895 if let Some(column) = projection_unqualified_column(projection) {
896 if column.eq_ignore_ascii_case("CARDINALITY") {
897 return Ok(Some(ProbabilisticReadProjection::Cardinality {
898 label: probabilistic_projection_label(projection, "cardinality", index),
899 }));
900 }
901 }
902
903 let Some((function, args)) = projection_function(projection) else {
904 return Ok(None);
905 };
906 if function.eq_ignore_ascii_case("FREQ") {
907 let element = projection_single_text_arg(function, args)?;
908 return Ok(Some(ProbabilisticReadProjection::Freq {
909 element,
910 label: probabilistic_projection_label(projection, "freq", index),
911 }));
912 }
913 if function.eq_ignore_ascii_case("CONTAINS") {
914 let element = projection_single_text_arg(function, args)?;
915 return Ok(Some(ProbabilisticReadProjection::Contains {
916 element,
917 label: probabilistic_projection_label(projection, "contains", index),
918 }));
919 }
920
921 Ok(None)
922}
923
924fn validate_probabilistic_read_model(
925 collection: &str,
926 actual_model: crate::catalog::CollectionModel,
927 projections: &[ProbabilisticReadProjection],
928) -> RedDBResult<()> {
929 for projection in projections {
930 let expected_model = match projection {
931 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
932 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
933 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
934 };
935 if actual_model != expected_model {
936 return Err(RedDBError::Query(format!(
937 "{} is only supported for {} collections; '{}' is {}",
938 probabilistic_projection_form(projection),
939 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
940 collection,
941 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
942 )));
943 }
944 }
945 Ok(())
946}
947
948impl RedDBRuntime {
949 fn materialize_probabilistic_select_row(
950 &self,
951 collection: &str,
952 projections: &[ProbabilisticReadProjection],
953 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
954 let mut columns = Vec::with_capacity(projections.len());
955 let mut record = UnifiedRecord::new();
956 for projection in projections {
957 match projection {
958 ProbabilisticReadProjection::Cardinality { label } => {
959 let hlls = probabilistic_read(
960 &self.inner.probabilistic.hlls,
961 "probabilistic HLL store",
962 );
963 let hll = hlls.get(collection).ok_or_else(|| {
964 RedDBError::NotFound(format!("HLL '{}' not found", collection))
965 })?;
966 columns.push(label.clone());
967 record.set(label, Value::UnsignedInteger(hll.count()));
968 }
969 ProbabilisticReadProjection::Freq { element, label } => {
970 let sketches = probabilistic_read(
971 &self.inner.probabilistic.sketches,
972 "probabilistic sketch store",
973 );
974 let sketch = sketches.get(collection).ok_or_else(|| {
975 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
976 })?;
977 columns.push(label.clone());
978 record.set(
979 label,
980 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
981 );
982 }
983 ProbabilisticReadProjection::Contains { element, label } => {
984 let filters = probabilistic_read(
985 &self.inner.probabilistic.filters,
986 "probabilistic filter store",
987 );
988 let filter = filters.get(collection).ok_or_else(|| {
989 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
990 })?;
991 columns.push(label.clone());
992 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
993 }
994 }
995 }
996 Ok((columns, record))
997 }
998}
999
1000fn probabilistic_select_row_visible(
1001 runtime: &RedDBRuntime,
1002 query: &TableQuery,
1003 record: &UnifiedRecord,
1004) -> bool {
1005 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1006 return false;
1007 }
1008 let table_name = query.table.as_str();
1009 let table_alias = query.alias.as_deref().unwrap_or(table_name);
1010 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1011 super::join_filter::evaluate_runtime_filter_with_db(
1012 Some(&runtime.inner.db),
1013 record,
1014 &filter,
1015 Some(table_name),
1016 Some(table_alias),
1017 )
1018 })
1019}
1020
1021fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1022 match projection {
1023 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1024 Some(column.as_str())
1025 }
1026 Projection::Column(column) => Some(column.as_str()),
1027 Projection::Alias(column, _) => Some(column.as_str()),
1028 _ => None,
1029 }
1030}
1031
1032fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1033 match projection {
1034 Projection::Function(name, args) => {
1035 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1036 Some((function, args.as_slice()))
1037 }
1038 _ => None,
1039 }
1040}
1041
1042fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1043 if args.len() != 1 {
1044 return Err(RedDBError::Query(format!(
1045 "{function}(...) expects exactly one string literal"
1046 )));
1047 }
1048 match &args[0] {
1049 Projection::Column(column) => column
1050 .strip_prefix("LIT:")
1051 .map(ToString::to_string)
1052 .ok_or_else(|| {
1053 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1054 }),
1055 _ => Err(RedDBError::Query(format!(
1056 "{function}(...) expects a string literal argument"
1057 ))),
1058 }
1059}
1060
1061fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1062 match projection {
1063 Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1064 if alias.eq_ignore_ascii_case(column) =>
1065 {
1066 numbered_probabilistic_label(base, index)
1067 }
1068 Projection::Field(_, Some(alias)) => alias.clone(),
1069 Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1070 numbered_probabilistic_label(base, index)
1071 }
1072 Projection::Alias(_, alias) => alias.clone(),
1073 Projection::Function(name, _) => name
1074 .split_once(':')
1075 .map(|(_, alias)| {
1076 if is_generated_probabilistic_function_label(alias, base) {
1077 numbered_probabilistic_label(base, index)
1078 } else {
1079 alias.to_string()
1080 }
1081 })
1082 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1083 _ => numbered_probabilistic_label(base, index),
1084 }
1085}
1086
1087fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1088 alias
1089 .get(..base.len())
1090 .is_some_and(|head| head.eq_ignore_ascii_case(base))
1091 && alias[base.len()..].starts_with('(')
1092}
1093
1094fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1095 if index == 0 {
1096 base.to_string()
1097 } else {
1098 format!("{base}_{}", index + 1)
1099 }
1100}
1101
1102fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1103 match projection {
1104 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1105 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1106 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1107 }
1108}