1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11 lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15 lock.write()
16}
17
18fn probabilistic_collection_contract(
19 name: &str,
20 model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22 let now = crate::utils::now_unix_millis() as u128;
23 crate::physical::CollectionContract {
24 name: name.to_string(),
25 declared_model: model,
26 schema_mode: crate::catalog::SchemaMode::Dynamic,
27 origin: crate::physical::ContractOrigin::Explicit,
28 version: 1,
29 created_at_unix_ms: now,
30 updated_at_unix_ms: now,
31 default_ttl_ms: None,
32 vector_dimension: None,
33 vector_metric: None,
34 context_index_fields: Vec::new(),
35 declared_columns: Vec::new(),
36 table_def: None,
37 timestamps_enabled: false,
38 context_index_enabled: false,
39 metrics_raw_retention_ms: None,
40 metrics_rollup_policies: Vec::new(),
41 metrics_tenant_identity: None,
42 metrics_namespace: None,
43 append_only: false,
44 subscriptions: Vec::new(),
45 analytics_config: Vec::new(),
46 session_key: None,
47 session_gap_ms: None,
48 retention_duration_ms: None,
49 analytical_storage: None,
50
51 ai_policy: None,
52 }
53}
54
55enum ProbabilisticReadProjection {
56 Cardinality { label: String },
57 Freq { element: String, label: String },
58 Contains { element: String, label: String },
59}
60
61impl RedDBRuntime {
62 pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
63 {
64 let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
65 let mut hlls =
66 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
67 for (name, data_hex) in entries {
68 let bytes = hex::decode(&data_hex).map_err(|err| {
69 RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
70 })?;
71 let Some(hll) =
72 crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
73 else {
74 return Err(RedDBError::Internal(format!(
75 "invalid persisted HLL state for '{name}'"
76 )));
77 };
78 hlls.insert(name, hll);
79 }
80 }
81
82 {
83 let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
84 let mut sketches = probabilistic_write(
85 &self.inner.probabilistic.sketches,
86 "probabilistic sketch store",
87 );
88 for (name, data_hex) in entries {
89 let bytes = hex::decode(&data_hex).map_err(|err| {
90 RedDBError::Internal(format!(
91 "invalid persisted SKETCH state for '{name}': {err}"
92 ))
93 })?;
94 let sketch =
95 crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
96 &bytes,
97 )
98 .ok_or_else(|| {
99 RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
100 })?;
101 sketches.insert(name, sketch);
102 }
103 }
104
105 {
106 let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
107 let mut filters = probabilistic_write(
108 &self.inner.probabilistic.filters,
109 "probabilistic filter store",
110 );
111 for (name, data_hex) in entries {
112 let bytes = hex::decode(&data_hex).map_err(|err| {
113 RedDBError::Internal(format!(
114 "invalid persisted FILTER state for '{name}': {err}"
115 ))
116 })?;
117 let filter =
118 crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
119 .ok_or_else(|| {
120 RedDBError::Internal(format!(
121 "invalid persisted FILTER state for '{name}'"
122 ))
123 })?;
124 filters.insert(name, filter);
125 }
126 }
127
128 Ok(())
129 }
130
131 fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
132 let Some(manager) = self.inner.db.store().get_collection("red_config") else {
133 return Vec::new();
134 };
135 let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
136 std::collections::HashMap::new();
137 for entity in manager.query_all(|_| true) {
138 let EntityData::Row(row) = &entity.data else {
139 continue;
140 };
141 let Some(named) = &row.named else {
142 continue;
143 };
144 let Some(Value::Text(key)) = named.get("key") else {
145 continue;
146 };
147 let Some(encoded_name) = key.strip_prefix(prefix) else {
148 continue;
149 };
150 let value = match named.get("value") {
151 Some(Value::Text(value)) => Some(value.to_string()),
152 Some(Value::Null) => None,
153 _ => continue,
154 };
155 let entity_id = entity.id.raw();
156 match latest.get(encoded_name) {
157 Some((existing_id, _)) if *existing_id > entity_id => {}
158 _ => {
159 latest.insert(encoded_name.to_string(), (entity_id, value));
160 }
161 }
162 }
163
164 latest
165 .into_iter()
166 .filter_map(|(encoded_name, (_, value))| {
167 let value = value?;
168 let bytes = hex::decode(encoded_name).ok()?;
169 let name = String::from_utf8(bytes).ok()?;
170 Some((name, value))
171 })
172 .collect()
173 }
174
175 fn persist_probabilistic_blob(
176 &self,
177 prefix: &str,
178 name: &str,
179 bytes: &[u8],
180 ) -> RedDBResult<()> {
181 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
182 self.inner
183 .db
184 .store()
185 .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
186 Ok(())
187 }
188
189 fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
190 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
191 self.inner
192 .db
193 .store()
194 .set_config_tree(&key, &crate::serde_json::Value::Null);
195 Ok(())
196 }
197
198 fn create_probabilistic_catalog_entry(
199 &self,
200 name: &str,
201 model: crate::catalog::CollectionModel,
202 ) -> RedDBResult<()> {
203 let store = self.inner.db.store();
204 store
205 .create_collection(name)
206 .map_err(|err| RedDBError::Internal(err.to_string()))?;
207 self.inner
208 .db
209 .save_collection_contract(probabilistic_collection_contract(name, model))
210 .map_err(|err| RedDBError::Internal(err.to_string()))?;
211 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
212 store.set_config_tree(
213 &format!("red.collection_tenants.{name}"),
214 &crate::serde_json::Value::String(tenant_id),
215 );
216 }
217 self.inner
218 .db
219 .persist_metadata()
220 .map_err(|err| RedDBError::Internal(err.to_string()))?;
221 self.invalidate_result_cache();
222 Ok(())
223 }
224
225 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
226 let store = self.inner.db.store();
227 if store.get_collection(name).is_some() {
228 store
229 .drop_collection(name)
230 .map_err(|err| RedDBError::Internal(err.to_string()))?;
231 }
232 self.inner
233 .db
234 .remove_collection_contract(name)
235 .map_err(|err| RedDBError::Internal(err.to_string()))?;
236 self.inner
237 .db
238 .persist_metadata()
239 .map_err(|err| RedDBError::Internal(err.to_string()))?;
240 self.invalidate_result_cache();
241 Ok(())
242 }
243
244 pub(crate) fn execute_probabilistic_select(
245 &self,
246 query: &TableQuery,
247 ) -> RedDBResult<Option<UnifiedResult>> {
248 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
249 let mut read_projections = Vec::new();
250 for projection in &projections {
251 if let Some(read_projection) =
252 parse_probabilistic_read_projection(projection, read_projections.len())?
253 {
254 read_projections.push(read_projection);
255 }
256 }
257
258 let Some(actual_model) = self
259 .inner
260 .db
261 .collection_contract(&query.table)
262 .map(|contract| contract.declared_model)
263 else {
264 return if read_projections.is_empty() {
265 Ok(None)
266 } else {
267 Err(RedDBError::NotFound(format!(
268 "probabilistic collection '{}' not found",
269 query.table
270 )))
271 };
272 };
273
274 let is_probabilistic_model = matches!(
275 actual_model,
276 crate::catalog::CollectionModel::Hll
277 | crate::catalog::CollectionModel::Sketch
278 | crate::catalog::CollectionModel::Filter
279 );
280 if read_projections.is_empty() {
281 return if is_probabilistic_model {
282 Err(RedDBError::Query(format!(
283 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
284 query.table
285 )))
286 } else {
287 Ok(None)
288 };
289 }
290
291 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
292 let (columns, record) =
293 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
294 let mut result = UnifiedResult::with_columns(columns);
295 if probabilistic_select_row_visible(self, query, &record) {
296 result.push(record);
297 }
298 Ok(Some(result))
299 }
300
301 pub fn execute_probabilistic_command(
302 &self,
303 raw_query: &str,
304 cmd: &ProbabilisticCommand,
305 ) -> RedDBResult<RuntimeQueryResult> {
306 let is_mutation = matches!(
310 cmd,
311 ProbabilisticCommand::CreateHll { .. }
312 | ProbabilisticCommand::HllAdd { .. }
313 | ProbabilisticCommand::HllMerge { .. }
314 | ProbabilisticCommand::DropHll { .. }
315 | ProbabilisticCommand::CreateSketch { .. }
316 | ProbabilisticCommand::SketchAdd { .. }
317 | ProbabilisticCommand::SketchMerge { .. }
318 | ProbabilisticCommand::DropSketch { .. }
319 | ProbabilisticCommand::CreateFilter { .. }
320 | ProbabilisticCommand::FilterAdd { .. }
321 | ProbabilisticCommand::FilterDelete { .. }
322 | ProbabilisticCommand::DropFilter { .. }
323 );
324 if is_mutation {
325 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
326 }
327 match cmd {
328 ProbabilisticCommand::CreateHll {
330 name,
331 precision,
332 if_not_exists,
333 } => {
334 let mut hlls =
335 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
336 if hlls.contains_key(name) {
337 if *if_not_exists {
338 return Ok(RuntimeQueryResult::ok_message(
339 raw_query.to_string(),
340 &format!("HLL '{}' already exists", name),
341 "create",
342 ));
343 }
344 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
345 }
346 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
347 *precision,
348 )
349 .ok_or_else(|| {
350 RedDBError::Query(format!(
351 "HLL precision must be between 4 and 18, got {precision}"
352 ))
353 })?;
354 self.create_probabilistic_catalog_entry(
355 name,
356 crate::catalog::CollectionModel::Hll,
357 )?;
358 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
359 hlls.insert(name.clone(), hll);
360 Ok(RuntimeQueryResult::ok_message(
361 raw_query.to_string(),
362 &format!("HLL '{}' created", name),
363 "create",
364 ))
365 }
366 ProbabilisticCommand::HllAdd { name, elements } => {
367 let mut hlls =
368 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
369 let hll = hlls
370 .get_mut(name)
371 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
372 for elem in elements {
373 hll.add(elem.as_bytes());
374 }
375 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
376 Ok(RuntimeQueryResult::ok_message(
377 raw_query.to_string(),
378 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
379 "insert",
380 ))
381 }
382 ProbabilisticCommand::HllCount { names } => {
383 let hlls =
384 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
385 if names.len() == 1 {
386 let hll = hlls.get(&names[0]).ok_or_else(|| {
387 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
388 })?;
389 let count = hll.count();
390 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
391 let mut record = UnifiedRecord::new();
392 record.set("count", Value::UnsignedInteger(count));
393 result.push(record);
394 Ok(RuntimeQueryResult {
395 query: raw_query.to_string(),
396 mode: QueryMode::Sql,
397 statement: "hll_count",
398 engine: "runtime-probabilistic",
399 result,
400 affected_rows: 0,
401 statement_type: "select",
402 bookmark: None,
403 })
404 } else {
405 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
407 for name in names {
408 let hll = hlls.get(name).ok_or_else(|| {
409 RedDBError::NotFound(format!("HLL '{}' not found", name))
410 })?;
411 merged.merge(hll);
412 }
413 let count = merged.count();
414 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
415 let mut record = UnifiedRecord::new();
416 record.set("count", Value::UnsignedInteger(count));
417 result.push(record);
418 Ok(RuntimeQueryResult {
419 query: raw_query.to_string(),
420 mode: QueryMode::Sql,
421 statement: "hll_count",
422 engine: "runtime-probabilistic",
423 result,
424 affected_rows: 0,
425 statement_type: "select",
426 bookmark: None,
427 })
428 }
429 }
430 ProbabilisticCommand::HllMerge { dest, sources } => {
431 let mut hlls =
432 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
433 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
434 for src in sources {
435 let hll = hlls
436 .get(src)
437 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
438 merged.merge(hll);
439 }
440 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
441 hlls.insert(dest.clone(), merged);
442 Ok(RuntimeQueryResult::ok_message(
443 raw_query.to_string(),
444 &format!(
445 "HLL '{}' created from merge of {}",
446 dest,
447 sources.join(", ")
448 ),
449 "create",
450 ))
451 }
452 ProbabilisticCommand::HllInfo { name } => {
453 let hlls =
454 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
455 let hll = hlls
456 .get(name)
457 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
458 let mut result = UnifiedResult::with_columns(vec![
459 "name".into(),
460 "precision".into(),
461 "count".into(),
462 "memory_bytes".into(),
463 ]);
464 let mut record = UnifiedRecord::new();
465 record.set("name", Value::text(name.clone()));
466 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
467 record.set("count", Value::UnsignedInteger(hll.count()));
468 record.set(
469 "memory_bytes",
470 Value::UnsignedInteger(hll.memory_bytes() as u64),
471 );
472 result.push(record);
473 Ok(RuntimeQueryResult {
474 query: raw_query.to_string(),
475 mode: QueryMode::Sql,
476 statement: "hll_info",
477 engine: "runtime-probabilistic",
478 result,
479 affected_rows: 0,
480 statement_type: "select",
481 bookmark: None,
482 })
483 }
484 ProbabilisticCommand::DropHll { name, if_exists } => {
485 let mut hlls =
486 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
487 if hlls.remove(name).is_none() {
488 if *if_exists {
489 return Ok(RuntimeQueryResult::ok_message(
490 raw_query.to_string(),
491 &format!("HLL '{}' does not exist", name),
492 "drop",
493 ));
494 }
495 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
496 }
497 self.drop_probabilistic_catalog_entry(name)?;
498 self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
499 Ok(RuntimeQueryResult::ok_message(
500 raw_query.to_string(),
501 &format!("HLL '{}' dropped", name),
502 "drop",
503 ))
504 }
505
506 ProbabilisticCommand::CreateSketch {
508 name,
509 width,
510 depth,
511 if_not_exists,
512 } => {
513 let mut sketches = probabilistic_write(
514 &self.inner.probabilistic.sketches,
515 "probabilistic sketch store",
516 );
517 if sketches.contains_key(name) {
518 if *if_not_exists {
519 return Ok(RuntimeQueryResult::ok_message(
520 raw_query.to_string(),
521 &format!("SKETCH '{}' already exists", name),
522 "create",
523 ));
524 }
525 return Err(RedDBError::Query(format!(
526 "SKETCH '{}' already exists",
527 name
528 )));
529 }
530 self.create_probabilistic_catalog_entry(
531 name,
532 crate::catalog::CollectionModel::Sketch,
533 )?;
534 let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
535 *width, *depth,
536 );
537 self.persist_probabilistic_blob(
538 PROB_SKETCH_STATE_PREFIX,
539 name,
540 &sketch.as_bytes(),
541 )?;
542 sketches.insert(name.clone(), sketch);
543 Ok(RuntimeQueryResult::ok_message(
544 raw_query.to_string(),
545 &format!(
546 "SKETCH '{}' created (width={}, depth={})",
547 name, width, depth
548 ),
549 "create",
550 ))
551 }
552 ProbabilisticCommand::SketchAdd {
553 name,
554 element,
555 count,
556 } => {
557 let mut sketches = probabilistic_write(
558 &self.inner.probabilistic.sketches,
559 "probabilistic sketch store",
560 );
561 let sketch = sketches
562 .get_mut(name)
563 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
564 sketch.add(element.as_bytes(), *count);
565 self.persist_probabilistic_blob(
566 PROB_SKETCH_STATE_PREFIX,
567 name,
568 &sketch.as_bytes(),
569 )?;
570 Ok(RuntimeQueryResult::ok_message(
571 raw_query.to_string(),
572 &format!("added {} to SKETCH '{}'", count, name),
573 "insert",
574 ))
575 }
576 ProbabilisticCommand::SketchCount { name, element } => {
577 let sketches = probabilistic_read(
578 &self.inner.probabilistic.sketches,
579 "probabilistic sketch store",
580 );
581 let sketch = sketches
582 .get(name)
583 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
584 let estimate = sketch.estimate(element.as_bytes());
585 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
586 let mut record = UnifiedRecord::new();
587 record.set("estimate", Value::UnsignedInteger(estimate));
588 result.push(record);
589 Ok(RuntimeQueryResult {
590 query: raw_query.to_string(),
591 mode: QueryMode::Sql,
592 statement: "sketch_count",
593 engine: "runtime-probabilistic",
594 result,
595 affected_rows: 0,
596 statement_type: "select",
597 bookmark: None,
598 })
599 }
600 ProbabilisticCommand::SketchMerge { dest, sources } => {
601 let mut sketches = probabilistic_write(
602 &self.inner.probabilistic.sketches,
603 "probabilistic sketch store",
604 );
605 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
606 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
607 })?;
608 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
609 first_src.width(),
610 first_src.depth(),
611 );
612 for src in sources {
613 let sketch = sketches.get(src).ok_or_else(|| {
614 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
615 })?;
616 if !merged.merge(sketch) {
617 return Err(RedDBError::Query(format!(
618 "SKETCH '{}' has incompatible dimensions",
619 src
620 )));
621 }
622 }
623 self.persist_probabilistic_blob(
624 PROB_SKETCH_STATE_PREFIX,
625 dest,
626 &merged.as_bytes(),
627 )?;
628 sketches.insert(dest.clone(), merged);
629 Ok(RuntimeQueryResult::ok_message(
630 raw_query.to_string(),
631 &format!(
632 "SKETCH '{}' created from merge of {}",
633 dest,
634 sources.join(", ")
635 ),
636 "create",
637 ))
638 }
639 ProbabilisticCommand::SketchInfo { name } => {
640 let sketches = probabilistic_read(
641 &self.inner.probabilistic.sketches,
642 "probabilistic sketch store",
643 );
644 let sketch = sketches
645 .get(name)
646 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
647 let mut result = UnifiedResult::with_columns(vec![
648 "name".into(),
649 "width".into(),
650 "depth".into(),
651 "total".into(),
652 "memory_bytes".into(),
653 ]);
654 let mut record = UnifiedRecord::new();
655 record.set("name", Value::text(name.clone()));
656 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
657 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
658 record.set("total", Value::UnsignedInteger(sketch.total()));
659 record.set(
660 "memory_bytes",
661 Value::UnsignedInteger(sketch.memory_bytes() as u64),
662 );
663 result.push(record);
664 Ok(RuntimeQueryResult {
665 query: raw_query.to_string(),
666 mode: QueryMode::Sql,
667 statement: "sketch_info",
668 engine: "runtime-probabilistic",
669 result,
670 affected_rows: 0,
671 statement_type: "select",
672 bookmark: None,
673 })
674 }
675 ProbabilisticCommand::DropSketch { name, if_exists } => {
676 let mut sketches = probabilistic_write(
677 &self.inner.probabilistic.sketches,
678 "probabilistic sketch store",
679 );
680 if sketches.remove(name).is_none() {
681 if *if_exists {
682 return Ok(RuntimeQueryResult::ok_message(
683 raw_query.to_string(),
684 &format!("SKETCH '{}' does not exist", name),
685 "drop",
686 ));
687 }
688 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
689 }
690 self.drop_probabilistic_catalog_entry(name)?;
691 self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
692 Ok(RuntimeQueryResult::ok_message(
693 raw_query.to_string(),
694 &format!("SKETCH '{}' dropped", name),
695 "drop",
696 ))
697 }
698
699 ProbabilisticCommand::CreateFilter {
701 name,
702 capacity,
703 if_not_exists,
704 } => {
705 let mut filters = probabilistic_write(
706 &self.inner.probabilistic.filters,
707 "probabilistic filter store",
708 );
709 if filters.contains_key(name) {
710 if *if_not_exists {
711 return Ok(RuntimeQueryResult::ok_message(
712 raw_query.to_string(),
713 &format!("FILTER '{}' already exists", name),
714 "create",
715 ));
716 }
717 return Err(RedDBError::Query(format!(
718 "FILTER '{}' already exists",
719 name
720 )));
721 }
722 self.create_probabilistic_catalog_entry(
723 name,
724 crate::catalog::CollectionModel::Filter,
725 )?;
726 let filter =
727 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
728 self.persist_probabilistic_blob(
729 PROB_FILTER_STATE_PREFIX,
730 name,
731 &filter.as_bytes(),
732 )?;
733 filters.insert(name.clone(), filter);
734 Ok(RuntimeQueryResult::ok_message(
735 raw_query.to_string(),
736 &format!("FILTER '{}' created (capacity={})", name, capacity),
737 "create",
738 ))
739 }
740 ProbabilisticCommand::FilterAdd { name, element } => {
741 let mut filters = probabilistic_write(
742 &self.inner.probabilistic.filters,
743 "probabilistic filter store",
744 );
745 let filter = filters
746 .get_mut(name)
747 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
748 if !filter.insert(element.as_bytes()) {
749 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
750 }
751 self.persist_probabilistic_blob(
752 PROB_FILTER_STATE_PREFIX,
753 name,
754 &filter.as_bytes(),
755 )?;
756 Ok(RuntimeQueryResult::ok_message(
757 raw_query.to_string(),
758 &format!("element added to FILTER '{}'", name),
759 "insert",
760 ))
761 }
762 ProbabilisticCommand::FilterCheck { name, element } => {
763 let filters = probabilistic_read(
764 &self.inner.probabilistic.filters,
765 "probabilistic filter store",
766 );
767 let filter = filters
768 .get(name)
769 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
770 let exists = filter.contains(element.as_bytes());
771 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
772 let mut record = UnifiedRecord::new();
773 record.set("exists", Value::Boolean(exists));
774 result.push(record);
775 Ok(RuntimeQueryResult {
776 query: raw_query.to_string(),
777 mode: QueryMode::Sql,
778 statement: "filter_check",
779 engine: "runtime-probabilistic",
780 result,
781 affected_rows: 0,
782 statement_type: "select",
783 bookmark: None,
784 })
785 }
786 ProbabilisticCommand::FilterDelete { name, element } => {
787 let mut filters = probabilistic_write(
788 &self.inner.probabilistic.filters,
789 "probabilistic filter store",
790 );
791 let filter = filters
792 .get_mut(name)
793 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
794 let removed = filter.delete(element.as_bytes());
795 self.persist_probabilistic_blob(
796 PROB_FILTER_STATE_PREFIX,
797 name,
798 &filter.as_bytes(),
799 )?;
800 Ok(RuntimeQueryResult::ok_message(
801 raw_query.to_string(),
802 &format!(
803 "element {} from FILTER '{}'",
804 if removed { "deleted" } else { "not found in" },
805 name
806 ),
807 "delete",
808 ))
809 }
810 ProbabilisticCommand::FilterCount { name } => {
811 let filters = probabilistic_read(
812 &self.inner.probabilistic.filters,
813 "probabilistic filter store",
814 );
815 let filter = filters
816 .get(name)
817 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
818 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
819 let mut record = UnifiedRecord::new();
820 record.set("count", Value::UnsignedInteger(filter.count() as u64));
821 result.push(record);
822 Ok(RuntimeQueryResult {
823 query: raw_query.to_string(),
824 mode: QueryMode::Sql,
825 statement: "filter_count",
826 engine: "runtime-probabilistic",
827 result,
828 affected_rows: 0,
829 statement_type: "select",
830 bookmark: None,
831 })
832 }
833 ProbabilisticCommand::FilterInfo { name } => {
834 let filters = probabilistic_read(
835 &self.inner.probabilistic.filters,
836 "probabilistic filter store",
837 );
838 let filter = filters
839 .get(name)
840 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
841 let mut result = UnifiedResult::with_columns(vec![
842 "name".into(),
843 "count".into(),
844 "load_factor".into(),
845 "memory_bytes".into(),
846 ]);
847 let mut record = UnifiedRecord::new();
848 record.set("name", Value::text(name.clone()));
849 record.set("count", Value::UnsignedInteger(filter.count() as u64));
850 record.set("load_factor", Value::Float(filter.load_factor()));
851 record.set(
852 "memory_bytes",
853 Value::UnsignedInteger(filter.memory_bytes() as u64),
854 );
855 result.push(record);
856 Ok(RuntimeQueryResult {
857 query: raw_query.to_string(),
858 mode: QueryMode::Sql,
859 statement: "filter_info",
860 engine: "runtime-probabilistic",
861 result,
862 affected_rows: 0,
863 statement_type: "select",
864 bookmark: None,
865 })
866 }
867 ProbabilisticCommand::DropFilter { name, if_exists } => {
868 let mut filters = probabilistic_write(
869 &self.inner.probabilistic.filters,
870 "probabilistic filter store",
871 );
872 if filters.remove(name).is_none() {
873 if *if_exists {
874 return Ok(RuntimeQueryResult::ok_message(
875 raw_query.to_string(),
876 &format!("FILTER '{}' does not exist", name),
877 "drop",
878 ));
879 }
880 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
881 }
882 self.drop_probabilistic_catalog_entry(name)?;
883 self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
884 Ok(RuntimeQueryResult::ok_message(
885 raw_query.to_string(),
886 &format!("FILTER '{}' dropped", name),
887 "drop",
888 ))
889 }
890 }
891 }
892}
893
894fn parse_probabilistic_read_projection(
895 projection: &Projection,
896 index: usize,
897) -> RedDBResult<Option<ProbabilisticReadProjection>> {
898 if let Some(column) = projection_unqualified_column(projection) {
899 if column.eq_ignore_ascii_case("CARDINALITY") {
900 return Ok(Some(ProbabilisticReadProjection::Cardinality {
901 label: probabilistic_projection_label(projection, "cardinality", index),
902 }));
903 }
904 }
905
906 let Some((function, args)) = projection_function(projection) else {
907 return Ok(None);
908 };
909 if function.eq_ignore_ascii_case("FREQ") {
910 let element = projection_single_text_arg(function, args)?;
911 return Ok(Some(ProbabilisticReadProjection::Freq {
912 element,
913 label: probabilistic_projection_label(projection, "freq", index),
914 }));
915 }
916 if function.eq_ignore_ascii_case("CONTAINS") {
917 let element = projection_single_text_arg(function, args)?;
918 return Ok(Some(ProbabilisticReadProjection::Contains {
919 element,
920 label: probabilistic_projection_label(projection, "contains", index),
921 }));
922 }
923
924 Ok(None)
925}
926
927fn validate_probabilistic_read_model(
928 collection: &str,
929 actual_model: crate::catalog::CollectionModel,
930 projections: &[ProbabilisticReadProjection],
931) -> RedDBResult<()> {
932 for projection in projections {
933 let expected_model = match projection {
934 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
935 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
936 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
937 };
938 if actual_model != expected_model {
939 return Err(RedDBError::Query(format!(
940 "{} is only supported for {} collections; '{}' is {}",
941 probabilistic_projection_form(projection),
942 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
943 collection,
944 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
945 )));
946 }
947 }
948 Ok(())
949}
950
951impl RedDBRuntime {
952 fn materialize_probabilistic_select_row(
953 &self,
954 collection: &str,
955 projections: &[ProbabilisticReadProjection],
956 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
957 let mut columns = Vec::with_capacity(projections.len());
958 let mut record = UnifiedRecord::new();
959 for projection in projections {
960 match projection {
961 ProbabilisticReadProjection::Cardinality { label } => {
962 let hlls = probabilistic_read(
963 &self.inner.probabilistic.hlls,
964 "probabilistic HLL store",
965 );
966 let hll = hlls.get(collection).ok_or_else(|| {
967 RedDBError::NotFound(format!("HLL '{}' not found", collection))
968 })?;
969 columns.push(label.clone());
970 record.set(label, Value::UnsignedInteger(hll.count()));
971 }
972 ProbabilisticReadProjection::Freq { element, label } => {
973 let sketches = probabilistic_read(
974 &self.inner.probabilistic.sketches,
975 "probabilistic sketch store",
976 );
977 let sketch = sketches.get(collection).ok_or_else(|| {
978 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
979 })?;
980 columns.push(label.clone());
981 record.set(
982 label,
983 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
984 );
985 }
986 ProbabilisticReadProjection::Contains { element, label } => {
987 let filters = probabilistic_read(
988 &self.inner.probabilistic.filters,
989 "probabilistic filter store",
990 );
991 let filter = filters.get(collection).ok_or_else(|| {
992 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
993 })?;
994 columns.push(label.clone());
995 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
996 }
997 }
998 }
999 Ok((columns, record))
1000 }
1001}
1002
1003fn probabilistic_select_row_visible(
1004 runtime: &RedDBRuntime,
1005 query: &TableQuery,
1006 record: &UnifiedRecord,
1007) -> bool {
1008 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1009 return false;
1010 }
1011 let table_name = query.table.as_str();
1012 let table_alias = query.alias.as_deref().unwrap_or(table_name);
1013 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1014 super::join_filter::evaluate_runtime_filter_with_db(
1015 Some(&runtime.inner.db),
1016 record,
1017 &filter,
1018 Some(table_name),
1019 Some(table_alias),
1020 )
1021 })
1022}
1023
1024fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1025 match projection {
1026 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1027 Some(column.as_str())
1028 }
1029 Projection::Column(column) => Some(column.as_str()),
1030 Projection::Alias(column, _) => Some(column.as_str()),
1031 _ => None,
1032 }
1033}
1034
1035fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1036 match projection {
1037 Projection::Function(name, args) => {
1038 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1039 Some((function, args.as_slice()))
1040 }
1041 _ => None,
1042 }
1043}
1044
1045fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1046 if args.len() != 1 {
1047 return Err(RedDBError::Query(format!(
1048 "{function}(...) expects exactly one string literal"
1049 )));
1050 }
1051 match &args[0] {
1052 Projection::Column(column) => column
1053 .strip_prefix("LIT:")
1054 .map(ToString::to_string)
1055 .ok_or_else(|| {
1056 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1057 }),
1058 _ => Err(RedDBError::Query(format!(
1059 "{function}(...) expects a string literal argument"
1060 ))),
1061 }
1062}
1063
1064fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1065 match projection {
1066 Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1067 if alias.eq_ignore_ascii_case(column) =>
1068 {
1069 numbered_probabilistic_label(base, index)
1070 }
1071 Projection::Field(_, Some(alias)) => alias.clone(),
1072 Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1073 numbered_probabilistic_label(base, index)
1074 }
1075 Projection::Alias(_, alias) => alias.clone(),
1076 Projection::Function(name, _) => name
1077 .split_once(':')
1078 .map(|(_, alias)| {
1079 if is_generated_probabilistic_function_label(alias, base) {
1080 numbered_probabilistic_label(base, index)
1081 } else {
1082 alias.to_string()
1083 }
1084 })
1085 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1086 _ => numbered_probabilistic_label(base, index),
1087 }
1088}
1089
1090fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1091 alias
1092 .get(..base.len())
1093 .is_some_and(|head| head.eq_ignore_ascii_case(base))
1094 && alias[base.len()..].starts_with('(')
1095}
1096
1097fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1098 if index == 0 {
1099 base.to_string()
1100 } else {
1101 format!("{base}_{}", index + 1)
1102 }
1103}
1104
1105fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1106 match projection {
1107 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1108 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1109 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1110 }
1111}