1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11 lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15 lock.write()
16}
17
18fn probabilistic_collection_contract(
19 name: &str,
20 model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22 let now = crate::utils::now_unix_millis() as u128;
23 crate::physical::CollectionContract {
24 name: name.to_string(),
25 declared_model: model,
26 schema_mode: crate::catalog::SchemaMode::Dynamic,
27 origin: crate::physical::ContractOrigin::Explicit,
28 version: 1,
29 created_at_unix_ms: now,
30 updated_at_unix_ms: now,
31 default_ttl_ms: None,
32 vector_dimension: None,
33 vector_metric: None,
34 context_index_fields: Vec::new(),
35 declared_columns: Vec::new(),
36 table_def: None,
37 timestamps_enabled: false,
38 context_index_enabled: false,
39 metrics_raw_retention_ms: None,
40 metrics_rollup_policies: Vec::new(),
41 metrics_tenant_identity: None,
42 metrics_namespace: None,
43 append_only: false,
44 subscriptions: Vec::new(),
45 session_key: None,
46 session_gap_ms: None,
47 retention_duration_ms: None,
48 }
49}
50
51enum ProbabilisticReadProjection {
52 Cardinality { label: String },
53 Freq { element: String, label: String },
54 Contains { element: String, label: String },
55}
56
57impl RedDBRuntime {
58 pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
59 {
60 let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
61 let mut hlls =
62 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
63 for (name, data_hex) in entries {
64 let bytes = hex::decode(&data_hex).map_err(|err| {
65 RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
66 })?;
67 let Some(hll) =
68 crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
69 else {
70 return Err(RedDBError::Internal(format!(
71 "invalid persisted HLL state for '{name}'"
72 )));
73 };
74 hlls.insert(name, hll);
75 }
76 }
77
78 {
79 let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
80 let mut sketches = probabilistic_write(
81 &self.inner.probabilistic.sketches,
82 "probabilistic sketch store",
83 );
84 for (name, data_hex) in entries {
85 let bytes = hex::decode(&data_hex).map_err(|err| {
86 RedDBError::Internal(format!(
87 "invalid persisted SKETCH state for '{name}': {err}"
88 ))
89 })?;
90 let sketch =
91 crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
92 &bytes,
93 )
94 .ok_or_else(|| {
95 RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
96 })?;
97 sketches.insert(name, sketch);
98 }
99 }
100
101 {
102 let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
103 let mut filters = probabilistic_write(
104 &self.inner.probabilistic.filters,
105 "probabilistic filter store",
106 );
107 for (name, data_hex) in entries {
108 let bytes = hex::decode(&data_hex).map_err(|err| {
109 RedDBError::Internal(format!(
110 "invalid persisted FILTER state for '{name}': {err}"
111 ))
112 })?;
113 let filter =
114 crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
115 .ok_or_else(|| {
116 RedDBError::Internal(format!(
117 "invalid persisted FILTER state for '{name}'"
118 ))
119 })?;
120 filters.insert(name, filter);
121 }
122 }
123
124 Ok(())
125 }
126
127 fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
128 let Some(manager) = self.inner.db.store().get_collection("red_config") else {
129 return Vec::new();
130 };
131 let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
132 std::collections::HashMap::new();
133 for entity in manager.query_all(|_| true) {
134 let EntityData::Row(row) = &entity.data else {
135 continue;
136 };
137 let Some(named) = &row.named else {
138 continue;
139 };
140 let Some(Value::Text(key)) = named.get("key") else {
141 continue;
142 };
143 let Some(encoded_name) = key.strip_prefix(prefix) else {
144 continue;
145 };
146 let value = match named.get("value") {
147 Some(Value::Text(value)) => Some(value.to_string()),
148 Some(Value::Null) => None,
149 _ => continue,
150 };
151 let entity_id = entity.id.raw();
152 match latest.get(encoded_name) {
153 Some((existing_id, _)) if *existing_id > entity_id => {}
154 _ => {
155 latest.insert(encoded_name.to_string(), (entity_id, value));
156 }
157 }
158 }
159
160 latest
161 .into_iter()
162 .filter_map(|(encoded_name, (_, value))| {
163 let value = value?;
164 let bytes = hex::decode(encoded_name).ok()?;
165 let name = String::from_utf8(bytes).ok()?;
166 Some((name, value))
167 })
168 .collect()
169 }
170
171 fn persist_probabilistic_blob(
172 &self,
173 prefix: &str,
174 name: &str,
175 bytes: &[u8],
176 ) -> RedDBResult<()> {
177 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
178 self.inner
179 .db
180 .store()
181 .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
182 Ok(())
183 }
184
185 fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
186 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
187 self.inner
188 .db
189 .store()
190 .set_config_tree(&key, &crate::serde_json::Value::Null);
191 Ok(())
192 }
193
194 fn create_probabilistic_catalog_entry(
195 &self,
196 name: &str,
197 model: crate::catalog::CollectionModel,
198 ) -> RedDBResult<()> {
199 let store = self.inner.db.store();
200 store
201 .create_collection(name)
202 .map_err(|err| RedDBError::Internal(err.to_string()))?;
203 self.inner
204 .db
205 .save_collection_contract(probabilistic_collection_contract(name, model))
206 .map_err(|err| RedDBError::Internal(err.to_string()))?;
207 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
208 store.set_config_tree(
209 &format!("red.collection_tenants.{name}"),
210 &crate::serde_json::Value::String(tenant_id),
211 );
212 }
213 self.inner
214 .db
215 .persist_metadata()
216 .map_err(|err| RedDBError::Internal(err.to_string()))?;
217 self.invalidate_result_cache();
218 Ok(())
219 }
220
221 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
222 let store = self.inner.db.store();
223 if store.get_collection(name).is_some() {
224 store
225 .drop_collection(name)
226 .map_err(|err| RedDBError::Internal(err.to_string()))?;
227 }
228 self.inner
229 .db
230 .remove_collection_contract(name)
231 .map_err(|err| RedDBError::Internal(err.to_string()))?;
232 self.inner
233 .db
234 .persist_metadata()
235 .map_err(|err| RedDBError::Internal(err.to_string()))?;
236 self.invalidate_result_cache();
237 Ok(())
238 }
239
240 pub(crate) fn execute_probabilistic_select(
241 &self,
242 query: &TableQuery,
243 ) -> RedDBResult<Option<UnifiedResult>> {
244 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
245 let mut read_projections = Vec::new();
246 for projection in &projections {
247 if let Some(read_projection) =
248 parse_probabilistic_read_projection(projection, read_projections.len())?
249 {
250 read_projections.push(read_projection);
251 }
252 }
253
254 let Some(actual_model) = self
255 .inner
256 .db
257 .collection_contract(&query.table)
258 .map(|contract| contract.declared_model)
259 else {
260 return if read_projections.is_empty() {
261 Ok(None)
262 } else {
263 Err(RedDBError::NotFound(format!(
264 "probabilistic collection '{}' not found",
265 query.table
266 )))
267 };
268 };
269
270 let is_probabilistic_model = matches!(
271 actual_model,
272 crate::catalog::CollectionModel::Hll
273 | crate::catalog::CollectionModel::Sketch
274 | crate::catalog::CollectionModel::Filter
275 );
276 if read_projections.is_empty() {
277 return if is_probabilistic_model {
278 Err(RedDBError::Query(format!(
279 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
280 query.table
281 )))
282 } else {
283 Ok(None)
284 };
285 }
286
287 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
288 let (columns, record) =
289 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
290 let mut result = UnifiedResult::with_columns(columns);
291 if probabilistic_select_row_visible(self, query, &record) {
292 result.push(record);
293 }
294 Ok(Some(result))
295 }
296
297 pub fn execute_probabilistic_command(
298 &self,
299 raw_query: &str,
300 cmd: &ProbabilisticCommand,
301 ) -> RedDBResult<RuntimeQueryResult> {
302 let is_mutation = matches!(
306 cmd,
307 ProbabilisticCommand::CreateHll { .. }
308 | ProbabilisticCommand::HllAdd { .. }
309 | ProbabilisticCommand::HllMerge { .. }
310 | ProbabilisticCommand::DropHll { .. }
311 | ProbabilisticCommand::CreateSketch { .. }
312 | ProbabilisticCommand::SketchAdd { .. }
313 | ProbabilisticCommand::SketchMerge { .. }
314 | ProbabilisticCommand::DropSketch { .. }
315 | ProbabilisticCommand::CreateFilter { .. }
316 | ProbabilisticCommand::FilterAdd { .. }
317 | ProbabilisticCommand::FilterDelete { .. }
318 | ProbabilisticCommand::DropFilter { .. }
319 );
320 if is_mutation {
321 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
322 }
323 match cmd {
324 ProbabilisticCommand::CreateHll {
326 name,
327 precision,
328 if_not_exists,
329 } => {
330 let mut hlls =
331 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
332 if hlls.contains_key(name) {
333 if *if_not_exists {
334 return Ok(RuntimeQueryResult::ok_message(
335 raw_query.to_string(),
336 &format!("HLL '{}' already exists", name),
337 "create",
338 ));
339 }
340 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
341 }
342 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
343 *precision,
344 )
345 .ok_or_else(|| {
346 RedDBError::Query(format!(
347 "HLL precision must be between 4 and 18, got {precision}"
348 ))
349 })?;
350 self.create_probabilistic_catalog_entry(
351 name,
352 crate::catalog::CollectionModel::Hll,
353 )?;
354 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
355 hlls.insert(name.clone(), hll);
356 Ok(RuntimeQueryResult::ok_message(
357 raw_query.to_string(),
358 &format!("HLL '{}' created", name),
359 "create",
360 ))
361 }
362 ProbabilisticCommand::HllAdd { name, elements } => {
363 let mut hlls =
364 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
365 let hll = hlls
366 .get_mut(name)
367 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
368 for elem in elements {
369 hll.add(elem.as_bytes());
370 }
371 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
372 Ok(RuntimeQueryResult::ok_message(
373 raw_query.to_string(),
374 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
375 "insert",
376 ))
377 }
378 ProbabilisticCommand::HllCount { names } => {
379 let hlls =
380 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
381 if names.len() == 1 {
382 let hll = hlls.get(&names[0]).ok_or_else(|| {
383 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
384 })?;
385 let count = hll.count();
386 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
387 let mut record = UnifiedRecord::new();
388 record.set("count", Value::UnsignedInteger(count));
389 result.push(record);
390 Ok(RuntimeQueryResult {
391 query: raw_query.to_string(),
392 mode: QueryMode::Sql,
393 statement: "hll_count",
394 engine: "runtime-probabilistic",
395 result,
396 affected_rows: 0,
397 statement_type: "select",
398 })
399 } else {
400 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
402 for name in names {
403 let hll = hlls.get(name).ok_or_else(|| {
404 RedDBError::NotFound(format!("HLL '{}' not found", name))
405 })?;
406 merged.merge(hll);
407 }
408 let count = merged.count();
409 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
410 let mut record = UnifiedRecord::new();
411 record.set("count", Value::UnsignedInteger(count));
412 result.push(record);
413 Ok(RuntimeQueryResult {
414 query: raw_query.to_string(),
415 mode: QueryMode::Sql,
416 statement: "hll_count",
417 engine: "runtime-probabilistic",
418 result,
419 affected_rows: 0,
420 statement_type: "select",
421 })
422 }
423 }
424 ProbabilisticCommand::HllMerge { dest, sources } => {
425 let mut hlls =
426 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
427 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
428 for src in sources {
429 let hll = hlls
430 .get(src)
431 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
432 merged.merge(hll);
433 }
434 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
435 hlls.insert(dest.clone(), merged);
436 Ok(RuntimeQueryResult::ok_message(
437 raw_query.to_string(),
438 &format!(
439 "HLL '{}' created from merge of {}",
440 dest,
441 sources.join(", ")
442 ),
443 "create",
444 ))
445 }
446 ProbabilisticCommand::HllInfo { name } => {
447 let hlls =
448 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
449 let hll = hlls
450 .get(name)
451 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
452 let mut result = UnifiedResult::with_columns(vec![
453 "name".into(),
454 "precision".into(),
455 "count".into(),
456 "memory_bytes".into(),
457 ]);
458 let mut record = UnifiedRecord::new();
459 record.set("name", Value::text(name.clone()));
460 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
461 record.set("count", Value::UnsignedInteger(hll.count()));
462 record.set(
463 "memory_bytes",
464 Value::UnsignedInteger(hll.memory_bytes() as u64),
465 );
466 result.push(record);
467 Ok(RuntimeQueryResult {
468 query: raw_query.to_string(),
469 mode: QueryMode::Sql,
470 statement: "hll_info",
471 engine: "runtime-probabilistic",
472 result,
473 affected_rows: 0,
474 statement_type: "select",
475 })
476 }
477 ProbabilisticCommand::DropHll { name, if_exists } => {
478 let mut hlls =
479 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
480 if hlls.remove(name).is_none() {
481 if *if_exists {
482 return Ok(RuntimeQueryResult::ok_message(
483 raw_query.to_string(),
484 &format!("HLL '{}' does not exist", name),
485 "drop",
486 ));
487 }
488 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
489 }
490 self.drop_probabilistic_catalog_entry(name)?;
491 self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
492 Ok(RuntimeQueryResult::ok_message(
493 raw_query.to_string(),
494 &format!("HLL '{}' dropped", name),
495 "drop",
496 ))
497 }
498
499 ProbabilisticCommand::CreateSketch {
501 name,
502 width,
503 depth,
504 if_not_exists,
505 } => {
506 let mut sketches = probabilistic_write(
507 &self.inner.probabilistic.sketches,
508 "probabilistic sketch store",
509 );
510 if sketches.contains_key(name) {
511 if *if_not_exists {
512 return Ok(RuntimeQueryResult::ok_message(
513 raw_query.to_string(),
514 &format!("SKETCH '{}' already exists", name),
515 "create",
516 ));
517 }
518 return Err(RedDBError::Query(format!(
519 "SKETCH '{}' already exists",
520 name
521 )));
522 }
523 self.create_probabilistic_catalog_entry(
524 name,
525 crate::catalog::CollectionModel::Sketch,
526 )?;
527 let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
528 *width, *depth,
529 );
530 self.persist_probabilistic_blob(
531 PROB_SKETCH_STATE_PREFIX,
532 name,
533 &sketch.as_bytes(),
534 )?;
535 sketches.insert(name.clone(), sketch);
536 Ok(RuntimeQueryResult::ok_message(
537 raw_query.to_string(),
538 &format!(
539 "SKETCH '{}' created (width={}, depth={})",
540 name, width, depth
541 ),
542 "create",
543 ))
544 }
545 ProbabilisticCommand::SketchAdd {
546 name,
547 element,
548 count,
549 } => {
550 let mut sketches = probabilistic_write(
551 &self.inner.probabilistic.sketches,
552 "probabilistic sketch store",
553 );
554 let sketch = sketches
555 .get_mut(name)
556 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
557 sketch.add(element.as_bytes(), *count);
558 self.persist_probabilistic_blob(
559 PROB_SKETCH_STATE_PREFIX,
560 name,
561 &sketch.as_bytes(),
562 )?;
563 Ok(RuntimeQueryResult::ok_message(
564 raw_query.to_string(),
565 &format!("added {} to SKETCH '{}'", count, name),
566 "insert",
567 ))
568 }
569 ProbabilisticCommand::SketchCount { name, element } => {
570 let sketches = probabilistic_read(
571 &self.inner.probabilistic.sketches,
572 "probabilistic sketch store",
573 );
574 let sketch = sketches
575 .get(name)
576 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
577 let estimate = sketch.estimate(element.as_bytes());
578 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
579 let mut record = UnifiedRecord::new();
580 record.set("estimate", Value::UnsignedInteger(estimate));
581 result.push(record);
582 Ok(RuntimeQueryResult {
583 query: raw_query.to_string(),
584 mode: QueryMode::Sql,
585 statement: "sketch_count",
586 engine: "runtime-probabilistic",
587 result,
588 affected_rows: 0,
589 statement_type: "select",
590 })
591 }
592 ProbabilisticCommand::SketchMerge { dest, sources } => {
593 let mut sketches = probabilistic_write(
594 &self.inner.probabilistic.sketches,
595 "probabilistic sketch store",
596 );
597 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
598 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
599 })?;
600 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
601 first_src.width(),
602 first_src.depth(),
603 );
604 for src in sources {
605 let sketch = sketches.get(src).ok_or_else(|| {
606 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
607 })?;
608 if !merged.merge(sketch) {
609 return Err(RedDBError::Query(format!(
610 "SKETCH '{}' has incompatible dimensions",
611 src
612 )));
613 }
614 }
615 self.persist_probabilistic_blob(
616 PROB_SKETCH_STATE_PREFIX,
617 dest,
618 &merged.as_bytes(),
619 )?;
620 sketches.insert(dest.clone(), merged);
621 Ok(RuntimeQueryResult::ok_message(
622 raw_query.to_string(),
623 &format!(
624 "SKETCH '{}' created from merge of {}",
625 dest,
626 sources.join(", ")
627 ),
628 "create",
629 ))
630 }
631 ProbabilisticCommand::SketchInfo { name } => {
632 let sketches = probabilistic_read(
633 &self.inner.probabilistic.sketches,
634 "probabilistic sketch store",
635 );
636 let sketch = sketches
637 .get(name)
638 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
639 let mut result = UnifiedResult::with_columns(vec![
640 "name".into(),
641 "width".into(),
642 "depth".into(),
643 "total".into(),
644 "memory_bytes".into(),
645 ]);
646 let mut record = UnifiedRecord::new();
647 record.set("name", Value::text(name.clone()));
648 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
649 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
650 record.set("total", Value::UnsignedInteger(sketch.total()));
651 record.set(
652 "memory_bytes",
653 Value::UnsignedInteger(sketch.memory_bytes() as u64),
654 );
655 result.push(record);
656 Ok(RuntimeQueryResult {
657 query: raw_query.to_string(),
658 mode: QueryMode::Sql,
659 statement: "sketch_info",
660 engine: "runtime-probabilistic",
661 result,
662 affected_rows: 0,
663 statement_type: "select",
664 })
665 }
666 ProbabilisticCommand::DropSketch { name, if_exists } => {
667 let mut sketches = probabilistic_write(
668 &self.inner.probabilistic.sketches,
669 "probabilistic sketch store",
670 );
671 if sketches.remove(name).is_none() {
672 if *if_exists {
673 return Ok(RuntimeQueryResult::ok_message(
674 raw_query.to_string(),
675 &format!("SKETCH '{}' does not exist", name),
676 "drop",
677 ));
678 }
679 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
680 }
681 self.drop_probabilistic_catalog_entry(name)?;
682 self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
683 Ok(RuntimeQueryResult::ok_message(
684 raw_query.to_string(),
685 &format!("SKETCH '{}' dropped", name),
686 "drop",
687 ))
688 }
689
690 ProbabilisticCommand::CreateFilter {
692 name,
693 capacity,
694 if_not_exists,
695 } => {
696 let mut filters = probabilistic_write(
697 &self.inner.probabilistic.filters,
698 "probabilistic filter store",
699 );
700 if filters.contains_key(name) {
701 if *if_not_exists {
702 return Ok(RuntimeQueryResult::ok_message(
703 raw_query.to_string(),
704 &format!("FILTER '{}' already exists", name),
705 "create",
706 ));
707 }
708 return Err(RedDBError::Query(format!(
709 "FILTER '{}' already exists",
710 name
711 )));
712 }
713 self.create_probabilistic_catalog_entry(
714 name,
715 crate::catalog::CollectionModel::Filter,
716 )?;
717 let filter =
718 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
719 self.persist_probabilistic_blob(
720 PROB_FILTER_STATE_PREFIX,
721 name,
722 &filter.as_bytes(),
723 )?;
724 filters.insert(name.clone(), filter);
725 Ok(RuntimeQueryResult::ok_message(
726 raw_query.to_string(),
727 &format!("FILTER '{}' created (capacity={})", name, capacity),
728 "create",
729 ))
730 }
731 ProbabilisticCommand::FilterAdd { name, element } => {
732 let mut filters = probabilistic_write(
733 &self.inner.probabilistic.filters,
734 "probabilistic filter store",
735 );
736 let filter = filters
737 .get_mut(name)
738 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
739 if !filter.insert(element.as_bytes()) {
740 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
741 }
742 self.persist_probabilistic_blob(
743 PROB_FILTER_STATE_PREFIX,
744 name,
745 &filter.as_bytes(),
746 )?;
747 Ok(RuntimeQueryResult::ok_message(
748 raw_query.to_string(),
749 &format!("element added to FILTER '{}'", name),
750 "insert",
751 ))
752 }
753 ProbabilisticCommand::FilterCheck { name, element } => {
754 let filters = probabilistic_read(
755 &self.inner.probabilistic.filters,
756 "probabilistic filter store",
757 );
758 let filter = filters
759 .get(name)
760 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
761 let exists = filter.contains(element.as_bytes());
762 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
763 let mut record = UnifiedRecord::new();
764 record.set("exists", Value::Boolean(exists));
765 result.push(record);
766 Ok(RuntimeQueryResult {
767 query: raw_query.to_string(),
768 mode: QueryMode::Sql,
769 statement: "filter_check",
770 engine: "runtime-probabilistic",
771 result,
772 affected_rows: 0,
773 statement_type: "select",
774 })
775 }
776 ProbabilisticCommand::FilterDelete { name, element } => {
777 let mut filters = probabilistic_write(
778 &self.inner.probabilistic.filters,
779 "probabilistic filter store",
780 );
781 let filter = filters
782 .get_mut(name)
783 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
784 let removed = filter.delete(element.as_bytes());
785 self.persist_probabilistic_blob(
786 PROB_FILTER_STATE_PREFIX,
787 name,
788 &filter.as_bytes(),
789 )?;
790 Ok(RuntimeQueryResult::ok_message(
791 raw_query.to_string(),
792 &format!(
793 "element {} from FILTER '{}'",
794 if removed { "deleted" } else { "not found in" },
795 name
796 ),
797 "delete",
798 ))
799 }
800 ProbabilisticCommand::FilterCount { name } => {
801 let filters = probabilistic_read(
802 &self.inner.probabilistic.filters,
803 "probabilistic filter store",
804 );
805 let filter = filters
806 .get(name)
807 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
808 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
809 let mut record = UnifiedRecord::new();
810 record.set("count", Value::UnsignedInteger(filter.count() as u64));
811 result.push(record);
812 Ok(RuntimeQueryResult {
813 query: raw_query.to_string(),
814 mode: QueryMode::Sql,
815 statement: "filter_count",
816 engine: "runtime-probabilistic",
817 result,
818 affected_rows: 0,
819 statement_type: "select",
820 })
821 }
822 ProbabilisticCommand::FilterInfo { name } => {
823 let filters = probabilistic_read(
824 &self.inner.probabilistic.filters,
825 "probabilistic filter store",
826 );
827 let filter = filters
828 .get(name)
829 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
830 let mut result = UnifiedResult::with_columns(vec![
831 "name".into(),
832 "count".into(),
833 "load_factor".into(),
834 "memory_bytes".into(),
835 ]);
836 let mut record = UnifiedRecord::new();
837 record.set("name", Value::text(name.clone()));
838 record.set("count", Value::UnsignedInteger(filter.count() as u64));
839 record.set("load_factor", Value::Float(filter.load_factor()));
840 record.set(
841 "memory_bytes",
842 Value::UnsignedInteger(filter.memory_bytes() as u64),
843 );
844 result.push(record);
845 Ok(RuntimeQueryResult {
846 query: raw_query.to_string(),
847 mode: QueryMode::Sql,
848 statement: "filter_info",
849 engine: "runtime-probabilistic",
850 result,
851 affected_rows: 0,
852 statement_type: "select",
853 })
854 }
855 ProbabilisticCommand::DropFilter { name, if_exists } => {
856 let mut filters = probabilistic_write(
857 &self.inner.probabilistic.filters,
858 "probabilistic filter store",
859 );
860 if filters.remove(name).is_none() {
861 if *if_exists {
862 return Ok(RuntimeQueryResult::ok_message(
863 raw_query.to_string(),
864 &format!("FILTER '{}' does not exist", name),
865 "drop",
866 ));
867 }
868 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
869 }
870 self.drop_probabilistic_catalog_entry(name)?;
871 self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
872 Ok(RuntimeQueryResult::ok_message(
873 raw_query.to_string(),
874 &format!("FILTER '{}' dropped", name),
875 "drop",
876 ))
877 }
878 }
879 }
880}
881
882fn parse_probabilistic_read_projection(
883 projection: &Projection,
884 index: usize,
885) -> RedDBResult<Option<ProbabilisticReadProjection>> {
886 if let Some(column) = projection_unqualified_column(projection) {
887 if column.eq_ignore_ascii_case("CARDINALITY") {
888 return Ok(Some(ProbabilisticReadProjection::Cardinality {
889 label: probabilistic_projection_label(projection, "cardinality", index),
890 }));
891 }
892 }
893
894 let Some((function, args)) = projection_function(projection) else {
895 return Ok(None);
896 };
897 if function.eq_ignore_ascii_case("FREQ") {
898 let element = projection_single_text_arg(function, args)?;
899 return Ok(Some(ProbabilisticReadProjection::Freq {
900 element,
901 label: probabilistic_projection_label(projection, "freq", index),
902 }));
903 }
904 if function.eq_ignore_ascii_case("CONTAINS") {
905 let element = projection_single_text_arg(function, args)?;
906 return Ok(Some(ProbabilisticReadProjection::Contains {
907 element,
908 label: probabilistic_projection_label(projection, "contains", index),
909 }));
910 }
911
912 Ok(None)
913}
914
915fn validate_probabilistic_read_model(
916 collection: &str,
917 actual_model: crate::catalog::CollectionModel,
918 projections: &[ProbabilisticReadProjection],
919) -> RedDBResult<()> {
920 for projection in projections {
921 let expected_model = match projection {
922 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
923 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
924 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
925 };
926 if actual_model != expected_model {
927 return Err(RedDBError::Query(format!(
928 "{} is only supported for {} collections; '{}' is {}",
929 probabilistic_projection_form(projection),
930 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
931 collection,
932 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
933 )));
934 }
935 }
936 Ok(())
937}
938
939impl RedDBRuntime {
940 fn materialize_probabilistic_select_row(
941 &self,
942 collection: &str,
943 projections: &[ProbabilisticReadProjection],
944 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
945 let mut columns = Vec::with_capacity(projections.len());
946 let mut record = UnifiedRecord::new();
947 for projection in projections {
948 match projection {
949 ProbabilisticReadProjection::Cardinality { label } => {
950 let hlls = probabilistic_read(
951 &self.inner.probabilistic.hlls,
952 "probabilistic HLL store",
953 );
954 let hll = hlls.get(collection).ok_or_else(|| {
955 RedDBError::NotFound(format!("HLL '{}' not found", collection))
956 })?;
957 columns.push(label.clone());
958 record.set(label, Value::UnsignedInteger(hll.count()));
959 }
960 ProbabilisticReadProjection::Freq { element, label } => {
961 let sketches = probabilistic_read(
962 &self.inner.probabilistic.sketches,
963 "probabilistic sketch store",
964 );
965 let sketch = sketches.get(collection).ok_or_else(|| {
966 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
967 })?;
968 columns.push(label.clone());
969 record.set(
970 label,
971 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
972 );
973 }
974 ProbabilisticReadProjection::Contains { element, label } => {
975 let filters = probabilistic_read(
976 &self.inner.probabilistic.filters,
977 "probabilistic filter store",
978 );
979 let filter = filters.get(collection).ok_or_else(|| {
980 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
981 })?;
982 columns.push(label.clone());
983 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
984 }
985 }
986 }
987 Ok((columns, record))
988 }
989}
990
991fn probabilistic_select_row_visible(
992 runtime: &RedDBRuntime,
993 query: &TableQuery,
994 record: &UnifiedRecord,
995) -> bool {
996 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
997 return false;
998 }
999 let table_name = query.table.as_str();
1000 let table_alias = query.alias.as_deref().unwrap_or(table_name);
1001 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1002 super::join_filter::evaluate_runtime_filter_with_db(
1003 Some(&runtime.inner.db),
1004 record,
1005 &filter,
1006 Some(table_name),
1007 Some(table_alias),
1008 )
1009 })
1010}
1011
1012fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1013 match projection {
1014 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1015 Some(column.as_str())
1016 }
1017 Projection::Column(column) => Some(column.as_str()),
1018 Projection::Alias(column, _) => Some(column.as_str()),
1019 _ => None,
1020 }
1021}
1022
1023fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1024 match projection {
1025 Projection::Function(name, args) => {
1026 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1027 Some((function, args.as_slice()))
1028 }
1029 _ => None,
1030 }
1031}
1032
1033fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1034 if args.len() != 1 {
1035 return Err(RedDBError::Query(format!(
1036 "{function}(...) expects exactly one string literal"
1037 )));
1038 }
1039 match &args[0] {
1040 Projection::Column(column) => column
1041 .strip_prefix("LIT:")
1042 .map(ToString::to_string)
1043 .ok_or_else(|| {
1044 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1045 }),
1046 _ => Err(RedDBError::Query(format!(
1047 "{function}(...) expects a string literal argument"
1048 ))),
1049 }
1050}
1051
1052fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1053 match projection {
1054 Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1055 if alias.eq_ignore_ascii_case(column) =>
1056 {
1057 numbered_probabilistic_label(base, index)
1058 }
1059 Projection::Field(_, Some(alias)) => alias.clone(),
1060 Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1061 numbered_probabilistic_label(base, index)
1062 }
1063 Projection::Alias(_, alias) => alias.clone(),
1064 Projection::Function(name, _) => name
1065 .split_once(':')
1066 .map(|(_, alias)| {
1067 if is_generated_probabilistic_function_label(alias, base) {
1068 numbered_probabilistic_label(base, index)
1069 } else {
1070 alias.to_string()
1071 }
1072 })
1073 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1074 _ => numbered_probabilistic_label(base, index),
1075 }
1076}
1077
1078fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1079 alias
1080 .get(..base.len())
1081 .is_some_and(|head| head.eq_ignore_ascii_case(base))
1082 && alias[base.len()..].starts_with('(')
1083}
1084
1085fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1086 if index == 0 {
1087 base.to_string()
1088 } else {
1089 format!("{base}_{}", index + 1)
1090 }
1091}
1092
1093fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1094 match projection {
1095 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1096 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1097 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1098 }
1099}