1use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11 lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15 lock.write()
16}
17
18fn probabilistic_collection_contract(
19 name: &str,
20 model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22 let now = crate::utils::now_unix_millis() as u128;
23 crate::physical::CollectionContract {
24 name: name.to_string(),
25 declared_model: model,
26 schema_mode: crate::catalog::SchemaMode::Dynamic,
27 origin: crate::physical::ContractOrigin::Explicit,
28 version: 1,
29 created_at_unix_ms: now,
30 updated_at_unix_ms: now,
31 default_ttl_ms: None,
32 vector_dimension: None,
33 vector_metric: None,
34 context_index_fields: Vec::new(),
35 declared_columns: Vec::new(),
36 table_def: None,
37 timestamps_enabled: false,
38 context_index_enabled: false,
39 metrics_raw_retention_ms: None,
40 metrics_rollup_policies: Vec::new(),
41 metrics_tenant_identity: None,
42 metrics_namespace: None,
43 append_only: false,
44 subscriptions: Vec::new(),
45 }
46}
47
48enum ProbabilisticReadProjection {
49 Cardinality { label: String },
50 Freq { element: String, label: String },
51 Contains { element: String, label: String },
52}
53
54impl RedDBRuntime {
55 pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
56 {
57 let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
58 let mut hlls =
59 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
60 for (name, data_hex) in entries {
61 let bytes = hex::decode(&data_hex).map_err(|err| {
62 RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
63 })?;
64 let Some(hll) =
65 crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
66 else {
67 return Err(RedDBError::Internal(format!(
68 "invalid persisted HLL state for '{name}'"
69 )));
70 };
71 hlls.insert(name, hll);
72 }
73 }
74
75 {
76 let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
77 let mut sketches = probabilistic_write(
78 &self.inner.probabilistic.sketches,
79 "probabilistic sketch store",
80 );
81 for (name, data_hex) in entries {
82 let bytes = hex::decode(&data_hex).map_err(|err| {
83 RedDBError::Internal(format!(
84 "invalid persisted SKETCH state for '{name}': {err}"
85 ))
86 })?;
87 let sketch =
88 crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
89 &bytes,
90 )
91 .ok_or_else(|| {
92 RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
93 })?;
94 sketches.insert(name, sketch);
95 }
96 }
97
98 {
99 let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
100 let mut filters = probabilistic_write(
101 &self.inner.probabilistic.filters,
102 "probabilistic filter store",
103 );
104 for (name, data_hex) in entries {
105 let bytes = hex::decode(&data_hex).map_err(|err| {
106 RedDBError::Internal(format!(
107 "invalid persisted FILTER state for '{name}': {err}"
108 ))
109 })?;
110 let filter =
111 crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
112 .ok_or_else(|| {
113 RedDBError::Internal(format!(
114 "invalid persisted FILTER state for '{name}'"
115 ))
116 })?;
117 filters.insert(name, filter);
118 }
119 }
120
121 Ok(())
122 }
123
124 fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
125 let Some(manager) = self.inner.db.store().get_collection("red_config") else {
126 return Vec::new();
127 };
128 let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
129 std::collections::HashMap::new();
130 for entity in manager.query_all(|_| true) {
131 let EntityData::Row(row) = &entity.data else {
132 continue;
133 };
134 let Some(named) = &row.named else {
135 continue;
136 };
137 let Some(Value::Text(key)) = named.get("key") else {
138 continue;
139 };
140 let Some(encoded_name) = key.strip_prefix(prefix) else {
141 continue;
142 };
143 let value = match named.get("value") {
144 Some(Value::Text(value)) => Some(value.to_string()),
145 Some(Value::Null) => None,
146 _ => continue,
147 };
148 let entity_id = entity.id.raw();
149 match latest.get(encoded_name) {
150 Some((existing_id, _)) if *existing_id > entity_id => {}
151 _ => {
152 latest.insert(encoded_name.to_string(), (entity_id, value));
153 }
154 }
155 }
156
157 latest
158 .into_iter()
159 .filter_map(|(encoded_name, (_, value))| {
160 let value = value?;
161 let bytes = hex::decode(encoded_name).ok()?;
162 let name = String::from_utf8(bytes).ok()?;
163 Some((name, value))
164 })
165 .collect()
166 }
167
168 fn persist_probabilistic_blob(
169 &self,
170 prefix: &str,
171 name: &str,
172 bytes: &[u8],
173 ) -> RedDBResult<()> {
174 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
175 self.inner
176 .db
177 .store()
178 .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
179 Ok(())
180 }
181
182 fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
183 let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
184 self.inner
185 .db
186 .store()
187 .set_config_tree(&key, &crate::serde_json::Value::Null);
188 Ok(())
189 }
190
191 fn create_probabilistic_catalog_entry(
192 &self,
193 name: &str,
194 model: crate::catalog::CollectionModel,
195 ) -> RedDBResult<()> {
196 let store = self.inner.db.store();
197 store
198 .create_collection(name)
199 .map_err(|err| RedDBError::Internal(err.to_string()))?;
200 self.inner
201 .db
202 .save_collection_contract(probabilistic_collection_contract(name, model))
203 .map_err(|err| RedDBError::Internal(err.to_string()))?;
204 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
205 store.set_config_tree(
206 &format!("red.collection_tenants.{name}"),
207 &crate::serde_json::Value::String(tenant_id),
208 );
209 }
210 self.inner
211 .db
212 .persist_metadata()
213 .map_err(|err| RedDBError::Internal(err.to_string()))?;
214 self.invalidate_result_cache();
215 Ok(())
216 }
217
218 fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
219 let store = self.inner.db.store();
220 if store.get_collection(name).is_some() {
221 store
222 .drop_collection(name)
223 .map_err(|err| RedDBError::Internal(err.to_string()))?;
224 }
225 self.inner
226 .db
227 .remove_collection_contract(name)
228 .map_err(|err| RedDBError::Internal(err.to_string()))?;
229 self.inner
230 .db
231 .persist_metadata()
232 .map_err(|err| RedDBError::Internal(err.to_string()))?;
233 self.invalidate_result_cache();
234 Ok(())
235 }
236
237 pub(crate) fn execute_probabilistic_select(
238 &self,
239 query: &TableQuery,
240 ) -> RedDBResult<Option<UnifiedResult>> {
241 let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
242 let mut read_projections = Vec::new();
243 for projection in &projections {
244 if let Some(read_projection) =
245 parse_probabilistic_read_projection(projection, read_projections.len())?
246 {
247 read_projections.push(read_projection);
248 }
249 }
250
251 let Some(actual_model) = self
252 .inner
253 .db
254 .collection_contract(&query.table)
255 .map(|contract| contract.declared_model)
256 else {
257 return if read_projections.is_empty() {
258 Ok(None)
259 } else {
260 Err(RedDBError::NotFound(format!(
261 "probabilistic collection '{}' not found",
262 query.table
263 )))
264 };
265 };
266
267 let is_probabilistic_model = matches!(
268 actual_model,
269 crate::catalog::CollectionModel::Hll
270 | crate::catalog::CollectionModel::Sketch
271 | crate::catalog::CollectionModel::Filter
272 );
273 if read_projections.is_empty() {
274 return if is_probabilistic_model {
275 Err(RedDBError::Query(format!(
276 "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
277 query.table
278 )))
279 } else {
280 Ok(None)
281 };
282 }
283
284 validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
285 let (columns, record) =
286 self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
287 let mut result = UnifiedResult::with_columns(columns);
288 if probabilistic_select_row_visible(self, query, &record) {
289 result.push(record);
290 }
291 Ok(Some(result))
292 }
293
294 pub fn execute_probabilistic_command(
295 &self,
296 raw_query: &str,
297 cmd: &ProbabilisticCommand,
298 ) -> RedDBResult<RuntimeQueryResult> {
299 let is_mutation = matches!(
303 cmd,
304 ProbabilisticCommand::CreateHll { .. }
305 | ProbabilisticCommand::HllAdd { .. }
306 | ProbabilisticCommand::HllMerge { .. }
307 | ProbabilisticCommand::DropHll { .. }
308 | ProbabilisticCommand::CreateSketch { .. }
309 | ProbabilisticCommand::SketchAdd { .. }
310 | ProbabilisticCommand::SketchMerge { .. }
311 | ProbabilisticCommand::DropSketch { .. }
312 | ProbabilisticCommand::CreateFilter { .. }
313 | ProbabilisticCommand::FilterAdd { .. }
314 | ProbabilisticCommand::FilterDelete { .. }
315 | ProbabilisticCommand::DropFilter { .. }
316 );
317 if is_mutation {
318 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
319 }
320 match cmd {
321 ProbabilisticCommand::CreateHll {
323 name,
324 precision,
325 if_not_exists,
326 } => {
327 let mut hlls =
328 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
329 if hlls.contains_key(name) {
330 if *if_not_exists {
331 return Ok(RuntimeQueryResult::ok_message(
332 raw_query.to_string(),
333 &format!("HLL '{}' already exists", name),
334 "create",
335 ));
336 }
337 return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
338 }
339 let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
340 *precision,
341 )
342 .ok_or_else(|| {
343 RedDBError::Query(format!(
344 "HLL precision must be between 4 and 18, got {precision}"
345 ))
346 })?;
347 self.create_probabilistic_catalog_entry(
348 name,
349 crate::catalog::CollectionModel::Hll,
350 )?;
351 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
352 hlls.insert(name.clone(), hll);
353 Ok(RuntimeQueryResult::ok_message(
354 raw_query.to_string(),
355 &format!("HLL '{}' created", name),
356 "create",
357 ))
358 }
359 ProbabilisticCommand::HllAdd { name, elements } => {
360 let mut hlls =
361 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
362 let hll = hlls
363 .get_mut(name)
364 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
365 for elem in elements {
366 hll.add(elem.as_bytes());
367 }
368 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
369 Ok(RuntimeQueryResult::ok_message(
370 raw_query.to_string(),
371 &format!("{} element(s) added to HLL '{}'", elements.len(), name),
372 "insert",
373 ))
374 }
375 ProbabilisticCommand::HllCount { names } => {
376 let hlls =
377 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
378 if names.len() == 1 {
379 let hll = hlls.get(&names[0]).ok_or_else(|| {
380 RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
381 })?;
382 let count = hll.count();
383 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
384 let mut record = UnifiedRecord::new();
385 record.set("count", Value::UnsignedInteger(count));
386 result.push(record);
387 Ok(RuntimeQueryResult {
388 query: raw_query.to_string(),
389 mode: QueryMode::Sql,
390 statement: "hll_count",
391 engine: "runtime-probabilistic",
392 result,
393 affected_rows: 0,
394 statement_type: "select",
395 })
396 } else {
397 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
399 for name in names {
400 let hll = hlls.get(name).ok_or_else(|| {
401 RedDBError::NotFound(format!("HLL '{}' not found", name))
402 })?;
403 merged.merge(hll);
404 }
405 let count = merged.count();
406 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
407 let mut record = UnifiedRecord::new();
408 record.set("count", Value::UnsignedInteger(count));
409 result.push(record);
410 Ok(RuntimeQueryResult {
411 query: raw_query.to_string(),
412 mode: QueryMode::Sql,
413 statement: "hll_count",
414 engine: "runtime-probabilistic",
415 result,
416 affected_rows: 0,
417 statement_type: "select",
418 })
419 }
420 }
421 ProbabilisticCommand::HllMerge { dest, sources } => {
422 let mut hlls =
423 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
424 let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
425 for src in sources {
426 let hll = hlls
427 .get(src)
428 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
429 merged.merge(hll);
430 }
431 self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
432 hlls.insert(dest.clone(), merged);
433 Ok(RuntimeQueryResult::ok_message(
434 raw_query.to_string(),
435 &format!(
436 "HLL '{}' created from merge of {}",
437 dest,
438 sources.join(", ")
439 ),
440 "create",
441 ))
442 }
443 ProbabilisticCommand::HllInfo { name } => {
444 let hlls =
445 probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
446 let hll = hlls
447 .get(name)
448 .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
449 let mut result = UnifiedResult::with_columns(vec![
450 "name".into(),
451 "precision".into(),
452 "count".into(),
453 "memory_bytes".into(),
454 ]);
455 let mut record = UnifiedRecord::new();
456 record.set("name", Value::text(name.clone()));
457 record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
458 record.set("count", Value::UnsignedInteger(hll.count()));
459 record.set(
460 "memory_bytes",
461 Value::UnsignedInteger(hll.memory_bytes() as u64),
462 );
463 result.push(record);
464 Ok(RuntimeQueryResult {
465 query: raw_query.to_string(),
466 mode: QueryMode::Sql,
467 statement: "hll_info",
468 engine: "runtime-probabilistic",
469 result,
470 affected_rows: 0,
471 statement_type: "select",
472 })
473 }
474 ProbabilisticCommand::DropHll { name, if_exists } => {
475 let mut hlls =
476 probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
477 if hlls.remove(name).is_none() {
478 if *if_exists {
479 return Ok(RuntimeQueryResult::ok_message(
480 raw_query.to_string(),
481 &format!("HLL '{}' does not exist", name),
482 "drop",
483 ));
484 }
485 return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
486 }
487 self.drop_probabilistic_catalog_entry(name)?;
488 self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
489 Ok(RuntimeQueryResult::ok_message(
490 raw_query.to_string(),
491 &format!("HLL '{}' dropped", name),
492 "drop",
493 ))
494 }
495
496 ProbabilisticCommand::CreateSketch {
498 name,
499 width,
500 depth,
501 if_not_exists,
502 } => {
503 let mut sketches = probabilistic_write(
504 &self.inner.probabilistic.sketches,
505 "probabilistic sketch store",
506 );
507 if sketches.contains_key(name) {
508 if *if_not_exists {
509 return Ok(RuntimeQueryResult::ok_message(
510 raw_query.to_string(),
511 &format!("SKETCH '{}' already exists", name),
512 "create",
513 ));
514 }
515 return Err(RedDBError::Query(format!(
516 "SKETCH '{}' already exists",
517 name
518 )));
519 }
520 self.create_probabilistic_catalog_entry(
521 name,
522 crate::catalog::CollectionModel::Sketch,
523 )?;
524 let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
525 *width, *depth,
526 );
527 self.persist_probabilistic_blob(
528 PROB_SKETCH_STATE_PREFIX,
529 name,
530 &sketch.as_bytes(),
531 )?;
532 sketches.insert(name.clone(), sketch);
533 Ok(RuntimeQueryResult::ok_message(
534 raw_query.to_string(),
535 &format!(
536 "SKETCH '{}' created (width={}, depth={})",
537 name, width, depth
538 ),
539 "create",
540 ))
541 }
542 ProbabilisticCommand::SketchAdd {
543 name,
544 element,
545 count,
546 } => {
547 let mut sketches = probabilistic_write(
548 &self.inner.probabilistic.sketches,
549 "probabilistic sketch store",
550 );
551 let sketch = sketches
552 .get_mut(name)
553 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
554 sketch.add(element.as_bytes(), *count);
555 self.persist_probabilistic_blob(
556 PROB_SKETCH_STATE_PREFIX,
557 name,
558 &sketch.as_bytes(),
559 )?;
560 Ok(RuntimeQueryResult::ok_message(
561 raw_query.to_string(),
562 &format!("added {} to SKETCH '{}'", count, name),
563 "insert",
564 ))
565 }
566 ProbabilisticCommand::SketchCount { name, element } => {
567 let sketches = probabilistic_read(
568 &self.inner.probabilistic.sketches,
569 "probabilistic sketch store",
570 );
571 let sketch = sketches
572 .get(name)
573 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
574 let estimate = sketch.estimate(element.as_bytes());
575 let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
576 let mut record = UnifiedRecord::new();
577 record.set("estimate", Value::UnsignedInteger(estimate));
578 result.push(record);
579 Ok(RuntimeQueryResult {
580 query: raw_query.to_string(),
581 mode: QueryMode::Sql,
582 statement: "sketch_count",
583 engine: "runtime-probabilistic",
584 result,
585 affected_rows: 0,
586 statement_type: "select",
587 })
588 }
589 ProbabilisticCommand::SketchMerge { dest, sources } => {
590 let mut sketches = probabilistic_write(
591 &self.inner.probabilistic.sketches,
592 "probabilistic sketch store",
593 );
594 let first_src = sketches.get(&sources[0]).ok_or_else(|| {
595 RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
596 })?;
597 let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
598 first_src.width(),
599 first_src.depth(),
600 );
601 for src in sources {
602 let sketch = sketches.get(src).ok_or_else(|| {
603 RedDBError::NotFound(format!("SKETCH '{}' not found", src))
604 })?;
605 if !merged.merge(sketch) {
606 return Err(RedDBError::Query(format!(
607 "SKETCH '{}' has incompatible dimensions",
608 src
609 )));
610 }
611 }
612 self.persist_probabilistic_blob(
613 PROB_SKETCH_STATE_PREFIX,
614 dest,
615 &merged.as_bytes(),
616 )?;
617 sketches.insert(dest.clone(), merged);
618 Ok(RuntimeQueryResult::ok_message(
619 raw_query.to_string(),
620 &format!(
621 "SKETCH '{}' created from merge of {}",
622 dest,
623 sources.join(", ")
624 ),
625 "create",
626 ))
627 }
628 ProbabilisticCommand::SketchInfo { name } => {
629 let sketches = probabilistic_read(
630 &self.inner.probabilistic.sketches,
631 "probabilistic sketch store",
632 );
633 let sketch = sketches
634 .get(name)
635 .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
636 let mut result = UnifiedResult::with_columns(vec![
637 "name".into(),
638 "width".into(),
639 "depth".into(),
640 "total".into(),
641 "memory_bytes".into(),
642 ]);
643 let mut record = UnifiedRecord::new();
644 record.set("name", Value::text(name.clone()));
645 record.set("width", Value::UnsignedInteger(sketch.width() as u64));
646 record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
647 record.set("total", Value::UnsignedInteger(sketch.total()));
648 record.set(
649 "memory_bytes",
650 Value::UnsignedInteger(sketch.memory_bytes() as u64),
651 );
652 result.push(record);
653 Ok(RuntimeQueryResult {
654 query: raw_query.to_string(),
655 mode: QueryMode::Sql,
656 statement: "sketch_info",
657 engine: "runtime-probabilistic",
658 result,
659 affected_rows: 0,
660 statement_type: "select",
661 })
662 }
663 ProbabilisticCommand::DropSketch { name, if_exists } => {
664 let mut sketches = probabilistic_write(
665 &self.inner.probabilistic.sketches,
666 "probabilistic sketch store",
667 );
668 if sketches.remove(name).is_none() {
669 if *if_exists {
670 return Ok(RuntimeQueryResult::ok_message(
671 raw_query.to_string(),
672 &format!("SKETCH '{}' does not exist", name),
673 "drop",
674 ));
675 }
676 return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
677 }
678 self.drop_probabilistic_catalog_entry(name)?;
679 self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
680 Ok(RuntimeQueryResult::ok_message(
681 raw_query.to_string(),
682 &format!("SKETCH '{}' dropped", name),
683 "drop",
684 ))
685 }
686
687 ProbabilisticCommand::CreateFilter {
689 name,
690 capacity,
691 if_not_exists,
692 } => {
693 let mut filters = probabilistic_write(
694 &self.inner.probabilistic.filters,
695 "probabilistic filter store",
696 );
697 if filters.contains_key(name) {
698 if *if_not_exists {
699 return Ok(RuntimeQueryResult::ok_message(
700 raw_query.to_string(),
701 &format!("FILTER '{}' already exists", name),
702 "create",
703 ));
704 }
705 return Err(RedDBError::Query(format!(
706 "FILTER '{}' already exists",
707 name
708 )));
709 }
710 self.create_probabilistic_catalog_entry(
711 name,
712 crate::catalog::CollectionModel::Filter,
713 )?;
714 let filter =
715 crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
716 self.persist_probabilistic_blob(
717 PROB_FILTER_STATE_PREFIX,
718 name,
719 &filter.as_bytes(),
720 )?;
721 filters.insert(name.clone(), filter);
722 Ok(RuntimeQueryResult::ok_message(
723 raw_query.to_string(),
724 &format!("FILTER '{}' created (capacity={})", name, capacity),
725 "create",
726 ))
727 }
728 ProbabilisticCommand::FilterAdd { name, element } => {
729 let mut filters = probabilistic_write(
730 &self.inner.probabilistic.filters,
731 "probabilistic filter store",
732 );
733 let filter = filters
734 .get_mut(name)
735 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
736 if !filter.insert(element.as_bytes()) {
737 return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
738 }
739 self.persist_probabilistic_blob(
740 PROB_FILTER_STATE_PREFIX,
741 name,
742 &filter.as_bytes(),
743 )?;
744 Ok(RuntimeQueryResult::ok_message(
745 raw_query.to_string(),
746 &format!("element added to FILTER '{}'", name),
747 "insert",
748 ))
749 }
750 ProbabilisticCommand::FilterCheck { name, element } => {
751 let filters = probabilistic_read(
752 &self.inner.probabilistic.filters,
753 "probabilistic filter store",
754 );
755 let filter = filters
756 .get(name)
757 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
758 let exists = filter.contains(element.as_bytes());
759 let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
760 let mut record = UnifiedRecord::new();
761 record.set("exists", Value::Boolean(exists));
762 result.push(record);
763 Ok(RuntimeQueryResult {
764 query: raw_query.to_string(),
765 mode: QueryMode::Sql,
766 statement: "filter_check",
767 engine: "runtime-probabilistic",
768 result,
769 affected_rows: 0,
770 statement_type: "select",
771 })
772 }
773 ProbabilisticCommand::FilterDelete { name, element } => {
774 let mut filters = probabilistic_write(
775 &self.inner.probabilistic.filters,
776 "probabilistic filter store",
777 );
778 let filter = filters
779 .get_mut(name)
780 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
781 let removed = filter.delete(element.as_bytes());
782 self.persist_probabilistic_blob(
783 PROB_FILTER_STATE_PREFIX,
784 name,
785 &filter.as_bytes(),
786 )?;
787 Ok(RuntimeQueryResult::ok_message(
788 raw_query.to_string(),
789 &format!(
790 "element {} from FILTER '{}'",
791 if removed { "deleted" } else { "not found in" },
792 name
793 ),
794 "delete",
795 ))
796 }
797 ProbabilisticCommand::FilterCount { name } => {
798 let filters = probabilistic_read(
799 &self.inner.probabilistic.filters,
800 "probabilistic filter store",
801 );
802 let filter = filters
803 .get(name)
804 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
805 let mut result = UnifiedResult::with_columns(vec!["count".into()]);
806 let mut record = UnifiedRecord::new();
807 record.set("count", Value::UnsignedInteger(filter.count() as u64));
808 result.push(record);
809 Ok(RuntimeQueryResult {
810 query: raw_query.to_string(),
811 mode: QueryMode::Sql,
812 statement: "filter_count",
813 engine: "runtime-probabilistic",
814 result,
815 affected_rows: 0,
816 statement_type: "select",
817 })
818 }
819 ProbabilisticCommand::FilterInfo { name } => {
820 let filters = probabilistic_read(
821 &self.inner.probabilistic.filters,
822 "probabilistic filter store",
823 );
824 let filter = filters
825 .get(name)
826 .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
827 let mut result = UnifiedResult::with_columns(vec![
828 "name".into(),
829 "count".into(),
830 "load_factor".into(),
831 "memory_bytes".into(),
832 ]);
833 let mut record = UnifiedRecord::new();
834 record.set("name", Value::text(name.clone()));
835 record.set("count", Value::UnsignedInteger(filter.count() as u64));
836 record.set("load_factor", Value::Float(filter.load_factor()));
837 record.set(
838 "memory_bytes",
839 Value::UnsignedInteger(filter.memory_bytes() as u64),
840 );
841 result.push(record);
842 Ok(RuntimeQueryResult {
843 query: raw_query.to_string(),
844 mode: QueryMode::Sql,
845 statement: "filter_info",
846 engine: "runtime-probabilistic",
847 result,
848 affected_rows: 0,
849 statement_type: "select",
850 })
851 }
852 ProbabilisticCommand::DropFilter { name, if_exists } => {
853 let mut filters = probabilistic_write(
854 &self.inner.probabilistic.filters,
855 "probabilistic filter store",
856 );
857 if filters.remove(name).is_none() {
858 if *if_exists {
859 return Ok(RuntimeQueryResult::ok_message(
860 raw_query.to_string(),
861 &format!("FILTER '{}' does not exist", name),
862 "drop",
863 ));
864 }
865 return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
866 }
867 self.drop_probabilistic_catalog_entry(name)?;
868 self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
869 Ok(RuntimeQueryResult::ok_message(
870 raw_query.to_string(),
871 &format!("FILTER '{}' dropped", name),
872 "drop",
873 ))
874 }
875 }
876 }
877}
878
879fn parse_probabilistic_read_projection(
880 projection: &Projection,
881 index: usize,
882) -> RedDBResult<Option<ProbabilisticReadProjection>> {
883 if let Some(column) = projection_unqualified_column(projection) {
884 if column.eq_ignore_ascii_case("CARDINALITY") {
885 return Ok(Some(ProbabilisticReadProjection::Cardinality {
886 label: probabilistic_projection_label(projection, "cardinality", index),
887 }));
888 }
889 }
890
891 let Some((function, args)) = projection_function(projection) else {
892 return Ok(None);
893 };
894 if function.eq_ignore_ascii_case("FREQ") {
895 let element = projection_single_text_arg(function, args)?;
896 return Ok(Some(ProbabilisticReadProjection::Freq {
897 element,
898 label: probabilistic_projection_label(projection, "freq", index),
899 }));
900 }
901 if function.eq_ignore_ascii_case("CONTAINS") {
902 let element = projection_single_text_arg(function, args)?;
903 return Ok(Some(ProbabilisticReadProjection::Contains {
904 element,
905 label: probabilistic_projection_label(projection, "contains", index),
906 }));
907 }
908
909 Ok(None)
910}
911
912fn validate_probabilistic_read_model(
913 collection: &str,
914 actual_model: crate::catalog::CollectionModel,
915 projections: &[ProbabilisticReadProjection],
916) -> RedDBResult<()> {
917 for projection in projections {
918 let expected_model = match projection {
919 ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
920 ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
921 ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
922 };
923 if actual_model != expected_model {
924 return Err(RedDBError::Query(format!(
925 "{} is only supported for {} collections; '{}' is {}",
926 probabilistic_projection_form(projection),
927 crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
928 collection,
929 crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
930 )));
931 }
932 }
933 Ok(())
934}
935
936impl RedDBRuntime {
937 fn materialize_probabilistic_select_row(
938 &self,
939 collection: &str,
940 projections: &[ProbabilisticReadProjection],
941 ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
942 let mut columns = Vec::with_capacity(projections.len());
943 let mut record = UnifiedRecord::new();
944 for projection in projections {
945 match projection {
946 ProbabilisticReadProjection::Cardinality { label } => {
947 let hlls = probabilistic_read(
948 &self.inner.probabilistic.hlls,
949 "probabilistic HLL store",
950 );
951 let hll = hlls.get(collection).ok_or_else(|| {
952 RedDBError::NotFound(format!("HLL '{}' not found", collection))
953 })?;
954 columns.push(label.clone());
955 record.set(label, Value::UnsignedInteger(hll.count()));
956 }
957 ProbabilisticReadProjection::Freq { element, label } => {
958 let sketches = probabilistic_read(
959 &self.inner.probabilistic.sketches,
960 "probabilistic sketch store",
961 );
962 let sketch = sketches.get(collection).ok_or_else(|| {
963 RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
964 })?;
965 columns.push(label.clone());
966 record.set(
967 label,
968 Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
969 );
970 }
971 ProbabilisticReadProjection::Contains { element, label } => {
972 let filters = probabilistic_read(
973 &self.inner.probabilistic.filters,
974 "probabilistic filter store",
975 );
976 let filter = filters.get(collection).ok_or_else(|| {
977 RedDBError::NotFound(format!("FILTER '{}' not found", collection))
978 })?;
979 columns.push(label.clone());
980 record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
981 }
982 }
983 }
984 Ok((columns, record))
985 }
986}
987
988fn probabilistic_select_row_visible(
989 runtime: &RedDBRuntime,
990 query: &TableQuery,
991 record: &UnifiedRecord,
992) -> bool {
993 if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
994 return false;
995 }
996 let table_name = query.table.as_str();
997 let table_alias = query.alias.as_deref().unwrap_or(table_name);
998 crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
999 super::join_filter::evaluate_runtime_filter_with_db(
1000 Some(&runtime.inner.db),
1001 record,
1002 &filter,
1003 Some(table_name),
1004 Some(table_alias),
1005 )
1006 })
1007}
1008
1009fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1010 match projection {
1011 Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1012 Some(column.as_str())
1013 }
1014 Projection::Column(column) => Some(column.as_str()),
1015 Projection::Alias(column, _) => Some(column.as_str()),
1016 _ => None,
1017 }
1018}
1019
1020fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1021 match projection {
1022 Projection::Function(name, args) => {
1023 let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1024 Some((function, args.as_slice()))
1025 }
1026 _ => None,
1027 }
1028}
1029
1030fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1031 if args.len() != 1 {
1032 return Err(RedDBError::Query(format!(
1033 "{function}(...) expects exactly one string literal"
1034 )));
1035 }
1036 match &args[0] {
1037 Projection::Column(column) => column
1038 .strip_prefix("LIT:")
1039 .map(ToString::to_string)
1040 .ok_or_else(|| {
1041 RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1042 }),
1043 _ => Err(RedDBError::Query(format!(
1044 "{function}(...) expects a string literal argument"
1045 ))),
1046 }
1047}
1048
1049fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1050 match projection {
1051 Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1052 if alias.eq_ignore_ascii_case(column) =>
1053 {
1054 numbered_probabilistic_label(base, index)
1055 }
1056 Projection::Field(_, Some(alias)) => alias.clone(),
1057 Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1058 numbered_probabilistic_label(base, index)
1059 }
1060 Projection::Alias(_, alias) => alias.clone(),
1061 Projection::Function(name, _) => name
1062 .split_once(':')
1063 .map(|(_, alias)| {
1064 if is_generated_probabilistic_function_label(alias, base) {
1065 numbered_probabilistic_label(base, index)
1066 } else {
1067 alias.to_string()
1068 }
1069 })
1070 .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1071 _ => numbered_probabilistic_label(base, index),
1072 }
1073}
1074
1075fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1076 alias
1077 .get(..base.len())
1078 .is_some_and(|head| head.eq_ignore_ascii_case(base))
1079 && alias[base.len()..].starts_with('(')
1080}
1081
1082fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1083 if index == 0 {
1084 base.to_string()
1085 } else {
1086 format!("{base}_{}", index + 1)
1087 }
1088}
1089
1090fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1091 match projection {
1092 ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1093 ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1094 ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1095 }
1096}