Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11    lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15    lock.write()
16}
17
18fn probabilistic_collection_contract(
19    name: &str,
20    model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22    let now = crate::utils::now_unix_millis() as u128;
23    crate::physical::CollectionContract {
24        name: name.to_string(),
25        declared_model: model,
26        schema_mode: crate::catalog::SchemaMode::Dynamic,
27        origin: crate::physical::ContractOrigin::Explicit,
28        version: 1,
29        created_at_unix_ms: now,
30        updated_at_unix_ms: now,
31        default_ttl_ms: None,
32        vector_dimension: None,
33        vector_metric: None,
34        context_index_fields: Vec::new(),
35        declared_columns: Vec::new(),
36        table_def: None,
37        timestamps_enabled: false,
38        context_index_enabled: false,
39        metrics_raw_retention_ms: None,
40        metrics_rollup_policies: Vec::new(),
41        metrics_tenant_identity: None,
42        metrics_namespace: None,
43        append_only: false,
44        subscriptions: Vec::new(),
45        analytics_config: Vec::new(),
46        session_key: None,
47        session_gap_ms: None,
48        retention_duration_ms: None,
49        analytical_storage: None,
50    }
51}
52
53enum ProbabilisticReadProjection {
54    Cardinality { label: String },
55    Freq { element: String, label: String },
56    Contains { element: String, label: String },
57}
58
59impl RedDBRuntime {
60    pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
61        {
62            let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
63            let mut hlls =
64                probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
65            for (name, data_hex) in entries {
66                let bytes = hex::decode(&data_hex).map_err(|err| {
67                    RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
68                })?;
69                let Some(hll) =
70                    crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
71                else {
72                    return Err(RedDBError::Internal(format!(
73                        "invalid persisted HLL state for '{name}'"
74                    )));
75                };
76                hlls.insert(name, hll);
77            }
78        }
79
80        {
81            let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
82            let mut sketches = probabilistic_write(
83                &self.inner.probabilistic.sketches,
84                "probabilistic sketch store",
85            );
86            for (name, data_hex) in entries {
87                let bytes = hex::decode(&data_hex).map_err(|err| {
88                    RedDBError::Internal(format!(
89                        "invalid persisted SKETCH state for '{name}': {err}"
90                    ))
91                })?;
92                let sketch =
93                    crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
94                        &bytes,
95                    )
96                    .ok_or_else(|| {
97                        RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
98                    })?;
99                sketches.insert(name, sketch);
100            }
101        }
102
103        {
104            let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
105            let mut filters = probabilistic_write(
106                &self.inner.probabilistic.filters,
107                "probabilistic filter store",
108            );
109            for (name, data_hex) in entries {
110                let bytes = hex::decode(&data_hex).map_err(|err| {
111                    RedDBError::Internal(format!(
112                        "invalid persisted FILTER state for '{name}': {err}"
113                    ))
114                })?;
115                let filter =
116                    crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
117                        .ok_or_else(|| {
118                            RedDBError::Internal(format!(
119                                "invalid persisted FILTER state for '{name}'"
120                            ))
121                        })?;
122                filters.insert(name, filter);
123            }
124        }
125
126        Ok(())
127    }
128
129    fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
130        let Some(manager) = self.inner.db.store().get_collection("red_config") else {
131            return Vec::new();
132        };
133        let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
134            std::collections::HashMap::new();
135        for entity in manager.query_all(|_| true) {
136            let EntityData::Row(row) = &entity.data else {
137                continue;
138            };
139            let Some(named) = &row.named else {
140                continue;
141            };
142            let Some(Value::Text(key)) = named.get("key") else {
143                continue;
144            };
145            let Some(encoded_name) = key.strip_prefix(prefix) else {
146                continue;
147            };
148            let value = match named.get("value") {
149                Some(Value::Text(value)) => Some(value.to_string()),
150                Some(Value::Null) => None,
151                _ => continue,
152            };
153            let entity_id = entity.id.raw();
154            match latest.get(encoded_name) {
155                Some((existing_id, _)) if *existing_id > entity_id => {}
156                _ => {
157                    latest.insert(encoded_name.to_string(), (entity_id, value));
158                }
159            }
160        }
161
162        latest
163            .into_iter()
164            .filter_map(|(encoded_name, (_, value))| {
165                let value = value?;
166                let bytes = hex::decode(encoded_name).ok()?;
167                let name = String::from_utf8(bytes).ok()?;
168                Some((name, value))
169            })
170            .collect()
171    }
172
173    fn persist_probabilistic_blob(
174        &self,
175        prefix: &str,
176        name: &str,
177        bytes: &[u8],
178    ) -> RedDBResult<()> {
179        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
180        self.inner
181            .db
182            .store()
183            .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
184        Ok(())
185    }
186
187    fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
188        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
189        self.inner
190            .db
191            .store()
192            .set_config_tree(&key, &crate::serde_json::Value::Null);
193        Ok(())
194    }
195
196    fn create_probabilistic_catalog_entry(
197        &self,
198        name: &str,
199        model: crate::catalog::CollectionModel,
200    ) -> RedDBResult<()> {
201        let store = self.inner.db.store();
202        store
203            .create_collection(name)
204            .map_err(|err| RedDBError::Internal(err.to_string()))?;
205        self.inner
206            .db
207            .save_collection_contract(probabilistic_collection_contract(name, model))
208            .map_err(|err| RedDBError::Internal(err.to_string()))?;
209        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
210            store.set_config_tree(
211                &format!("red.collection_tenants.{name}"),
212                &crate::serde_json::Value::String(tenant_id),
213            );
214        }
215        self.inner
216            .db
217            .persist_metadata()
218            .map_err(|err| RedDBError::Internal(err.to_string()))?;
219        self.invalidate_result_cache();
220        Ok(())
221    }
222
223    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
224        let store = self.inner.db.store();
225        if store.get_collection(name).is_some() {
226            store
227                .drop_collection(name)
228                .map_err(|err| RedDBError::Internal(err.to_string()))?;
229        }
230        self.inner
231            .db
232            .remove_collection_contract(name)
233            .map_err(|err| RedDBError::Internal(err.to_string()))?;
234        self.inner
235            .db
236            .persist_metadata()
237            .map_err(|err| RedDBError::Internal(err.to_string()))?;
238        self.invalidate_result_cache();
239        Ok(())
240    }
241
242    pub(crate) fn execute_probabilistic_select(
243        &self,
244        query: &TableQuery,
245    ) -> RedDBResult<Option<UnifiedResult>> {
246        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
247        let mut read_projections = Vec::new();
248        for projection in &projections {
249            if let Some(read_projection) =
250                parse_probabilistic_read_projection(projection, read_projections.len())?
251            {
252                read_projections.push(read_projection);
253            }
254        }
255
256        let Some(actual_model) = self
257            .inner
258            .db
259            .collection_contract(&query.table)
260            .map(|contract| contract.declared_model)
261        else {
262            return if read_projections.is_empty() {
263                Ok(None)
264            } else {
265                Err(RedDBError::NotFound(format!(
266                    "probabilistic collection '{}' not found",
267                    query.table
268                )))
269            };
270        };
271
272        let is_probabilistic_model = matches!(
273            actual_model,
274            crate::catalog::CollectionModel::Hll
275                | crate::catalog::CollectionModel::Sketch
276                | crate::catalog::CollectionModel::Filter
277        );
278        if read_projections.is_empty() {
279            return if is_probabilistic_model {
280                Err(RedDBError::Query(format!(
281                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
282                    query.table
283                )))
284            } else {
285                Ok(None)
286            };
287        }
288
289        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
290        let (columns, record) =
291            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
292        let mut result = UnifiedResult::with_columns(columns);
293        if probabilistic_select_row_visible(self, query, &record) {
294            result.push(record);
295        }
296        Ok(Some(result))
297    }
298
299    pub fn execute_probabilistic_command(
300        &self,
301        raw_query: &str,
302        cmd: &ProbabilisticCommand,
303    ) -> RedDBResult<RuntimeQueryResult> {
304        // Mixed read/write surface: count/info/check are read-side and
305        // must remain available on read-only replicas; create/add/
306        // merge/delete/drop are mutations and must go through the gate.
307        let is_mutation = matches!(
308            cmd,
309            ProbabilisticCommand::CreateHll { .. }
310                | ProbabilisticCommand::HllAdd { .. }
311                | ProbabilisticCommand::HllMerge { .. }
312                | ProbabilisticCommand::DropHll { .. }
313                | ProbabilisticCommand::CreateSketch { .. }
314                | ProbabilisticCommand::SketchAdd { .. }
315                | ProbabilisticCommand::SketchMerge { .. }
316                | ProbabilisticCommand::DropSketch { .. }
317                | ProbabilisticCommand::CreateFilter { .. }
318                | ProbabilisticCommand::FilterAdd { .. }
319                | ProbabilisticCommand::FilterDelete { .. }
320                | ProbabilisticCommand::DropFilter { .. }
321        );
322        if is_mutation {
323            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
324        }
325        match cmd {
326            // ── HyperLogLog ──────────────────────────────────────────
327            ProbabilisticCommand::CreateHll {
328                name,
329                precision,
330                if_not_exists,
331            } => {
332                let mut hlls =
333                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
334                if hlls.contains_key(name) {
335                    if *if_not_exists {
336                        return Ok(RuntimeQueryResult::ok_message(
337                            raw_query.to_string(),
338                            &format!("HLL '{}' already exists", name),
339                            "create",
340                        ));
341                    }
342                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
343                }
344                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
345                    *precision,
346                )
347                .ok_or_else(|| {
348                    RedDBError::Query(format!(
349                        "HLL precision must be between 4 and 18, got {precision}"
350                    ))
351                })?;
352                self.create_probabilistic_catalog_entry(
353                    name,
354                    crate::catalog::CollectionModel::Hll,
355                )?;
356                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
357                hlls.insert(name.clone(), hll);
358                Ok(RuntimeQueryResult::ok_message(
359                    raw_query.to_string(),
360                    &format!("HLL '{}' created", name),
361                    "create",
362                ))
363            }
364            ProbabilisticCommand::HllAdd { name, elements } => {
365                let mut hlls =
366                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
367                let hll = hlls
368                    .get_mut(name)
369                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
370                for elem in elements {
371                    hll.add(elem.as_bytes());
372                }
373                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
374                Ok(RuntimeQueryResult::ok_message(
375                    raw_query.to_string(),
376                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
377                    "insert",
378                ))
379            }
380            ProbabilisticCommand::HllCount { names } => {
381                let hlls =
382                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
383                if names.len() == 1 {
384                    let hll = hlls.get(&names[0]).ok_or_else(|| {
385                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
386                    })?;
387                    let count = hll.count();
388                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
389                    let mut record = UnifiedRecord::new();
390                    record.set("count", Value::UnsignedInteger(count));
391                    result.push(record);
392                    Ok(RuntimeQueryResult {
393                        query: raw_query.to_string(),
394                        mode: QueryMode::Sql,
395                        statement: "hll_count",
396                        engine: "runtime-probabilistic",
397                        result,
398                        affected_rows: 0,
399                        statement_type: "select",
400                        bookmark: None,
401                    })
402                } else {
403                    // Multi-HLL count = union count
404                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
405                    for name in names {
406                        let hll = hlls.get(name).ok_or_else(|| {
407                            RedDBError::NotFound(format!("HLL '{}' not found", name))
408                        })?;
409                        merged.merge(hll);
410                    }
411                    let count = merged.count();
412                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
413                    let mut record = UnifiedRecord::new();
414                    record.set("count", Value::UnsignedInteger(count));
415                    result.push(record);
416                    Ok(RuntimeQueryResult {
417                        query: raw_query.to_string(),
418                        mode: QueryMode::Sql,
419                        statement: "hll_count",
420                        engine: "runtime-probabilistic",
421                        result,
422                        affected_rows: 0,
423                        statement_type: "select",
424                        bookmark: None,
425                    })
426                }
427            }
428            ProbabilisticCommand::HllMerge { dest, sources } => {
429                let mut hlls =
430                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
431                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
432                for src in sources {
433                    let hll = hlls
434                        .get(src)
435                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
436                    merged.merge(hll);
437                }
438                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
439                hlls.insert(dest.clone(), merged);
440                Ok(RuntimeQueryResult::ok_message(
441                    raw_query.to_string(),
442                    &format!(
443                        "HLL '{}' created from merge of {}",
444                        dest,
445                        sources.join(", ")
446                    ),
447                    "create",
448                ))
449            }
450            ProbabilisticCommand::HllInfo { name } => {
451                let hlls =
452                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
453                let hll = hlls
454                    .get(name)
455                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
456                let mut result = UnifiedResult::with_columns(vec![
457                    "name".into(),
458                    "precision".into(),
459                    "count".into(),
460                    "memory_bytes".into(),
461                ]);
462                let mut record = UnifiedRecord::new();
463                record.set("name", Value::text(name.clone()));
464                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
465                record.set("count", Value::UnsignedInteger(hll.count()));
466                record.set(
467                    "memory_bytes",
468                    Value::UnsignedInteger(hll.memory_bytes() as u64),
469                );
470                result.push(record);
471                Ok(RuntimeQueryResult {
472                    query: raw_query.to_string(),
473                    mode: QueryMode::Sql,
474                    statement: "hll_info",
475                    engine: "runtime-probabilistic",
476                    result,
477                    affected_rows: 0,
478                    statement_type: "select",
479                    bookmark: None,
480                })
481            }
482            ProbabilisticCommand::DropHll { name, if_exists } => {
483                let mut hlls =
484                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
485                if hlls.remove(name).is_none() {
486                    if *if_exists {
487                        return Ok(RuntimeQueryResult::ok_message(
488                            raw_query.to_string(),
489                            &format!("HLL '{}' does not exist", name),
490                            "drop",
491                        ));
492                    }
493                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
494                }
495                self.drop_probabilistic_catalog_entry(name)?;
496                self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
497                Ok(RuntimeQueryResult::ok_message(
498                    raw_query.to_string(),
499                    &format!("HLL '{}' dropped", name),
500                    "drop",
501                ))
502            }
503
504            // ── Count-Min Sketch ───────────────────────────────────────
505            ProbabilisticCommand::CreateSketch {
506                name,
507                width,
508                depth,
509                if_not_exists,
510            } => {
511                let mut sketches = probabilistic_write(
512                    &self.inner.probabilistic.sketches,
513                    "probabilistic sketch store",
514                );
515                if sketches.contains_key(name) {
516                    if *if_not_exists {
517                        return Ok(RuntimeQueryResult::ok_message(
518                            raw_query.to_string(),
519                            &format!("SKETCH '{}' already exists", name),
520                            "create",
521                        ));
522                    }
523                    return Err(RedDBError::Query(format!(
524                        "SKETCH '{}' already exists",
525                        name
526                    )));
527                }
528                self.create_probabilistic_catalog_entry(
529                    name,
530                    crate::catalog::CollectionModel::Sketch,
531                )?;
532                let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
533                    *width, *depth,
534                );
535                self.persist_probabilistic_blob(
536                    PROB_SKETCH_STATE_PREFIX,
537                    name,
538                    &sketch.as_bytes(),
539                )?;
540                sketches.insert(name.clone(), sketch);
541                Ok(RuntimeQueryResult::ok_message(
542                    raw_query.to_string(),
543                    &format!(
544                        "SKETCH '{}' created (width={}, depth={})",
545                        name, width, depth
546                    ),
547                    "create",
548                ))
549            }
550            ProbabilisticCommand::SketchAdd {
551                name,
552                element,
553                count,
554            } => {
555                let mut sketches = probabilistic_write(
556                    &self.inner.probabilistic.sketches,
557                    "probabilistic sketch store",
558                );
559                let sketch = sketches
560                    .get_mut(name)
561                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
562                sketch.add(element.as_bytes(), *count);
563                self.persist_probabilistic_blob(
564                    PROB_SKETCH_STATE_PREFIX,
565                    name,
566                    &sketch.as_bytes(),
567                )?;
568                Ok(RuntimeQueryResult::ok_message(
569                    raw_query.to_string(),
570                    &format!("added {} to SKETCH '{}'", count, name),
571                    "insert",
572                ))
573            }
574            ProbabilisticCommand::SketchCount { name, element } => {
575                let sketches = probabilistic_read(
576                    &self.inner.probabilistic.sketches,
577                    "probabilistic sketch store",
578                );
579                let sketch = sketches
580                    .get(name)
581                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
582                let estimate = sketch.estimate(element.as_bytes());
583                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
584                let mut record = UnifiedRecord::new();
585                record.set("estimate", Value::UnsignedInteger(estimate));
586                result.push(record);
587                Ok(RuntimeQueryResult {
588                    query: raw_query.to_string(),
589                    mode: QueryMode::Sql,
590                    statement: "sketch_count",
591                    engine: "runtime-probabilistic",
592                    result,
593                    affected_rows: 0,
594                    statement_type: "select",
595                    bookmark: None,
596                })
597            }
598            ProbabilisticCommand::SketchMerge { dest, sources } => {
599                let mut sketches = probabilistic_write(
600                    &self.inner.probabilistic.sketches,
601                    "probabilistic sketch store",
602                );
603                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
604                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
605                })?;
606                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
607                    first_src.width(),
608                    first_src.depth(),
609                );
610                for src in sources {
611                    let sketch = sketches.get(src).ok_or_else(|| {
612                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
613                    })?;
614                    if !merged.merge(sketch) {
615                        return Err(RedDBError::Query(format!(
616                            "SKETCH '{}' has incompatible dimensions",
617                            src
618                        )));
619                    }
620                }
621                self.persist_probabilistic_blob(
622                    PROB_SKETCH_STATE_PREFIX,
623                    dest,
624                    &merged.as_bytes(),
625                )?;
626                sketches.insert(dest.clone(), merged);
627                Ok(RuntimeQueryResult::ok_message(
628                    raw_query.to_string(),
629                    &format!(
630                        "SKETCH '{}' created from merge of {}",
631                        dest,
632                        sources.join(", ")
633                    ),
634                    "create",
635                ))
636            }
637            ProbabilisticCommand::SketchInfo { name } => {
638                let sketches = probabilistic_read(
639                    &self.inner.probabilistic.sketches,
640                    "probabilistic sketch store",
641                );
642                let sketch = sketches
643                    .get(name)
644                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
645                let mut result = UnifiedResult::with_columns(vec![
646                    "name".into(),
647                    "width".into(),
648                    "depth".into(),
649                    "total".into(),
650                    "memory_bytes".into(),
651                ]);
652                let mut record = UnifiedRecord::new();
653                record.set("name", Value::text(name.clone()));
654                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
655                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
656                record.set("total", Value::UnsignedInteger(sketch.total()));
657                record.set(
658                    "memory_bytes",
659                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
660                );
661                result.push(record);
662                Ok(RuntimeQueryResult {
663                    query: raw_query.to_string(),
664                    mode: QueryMode::Sql,
665                    statement: "sketch_info",
666                    engine: "runtime-probabilistic",
667                    result,
668                    affected_rows: 0,
669                    statement_type: "select",
670                    bookmark: None,
671                })
672            }
673            ProbabilisticCommand::DropSketch { name, if_exists } => {
674                let mut sketches = probabilistic_write(
675                    &self.inner.probabilistic.sketches,
676                    "probabilistic sketch store",
677                );
678                if sketches.remove(name).is_none() {
679                    if *if_exists {
680                        return Ok(RuntimeQueryResult::ok_message(
681                            raw_query.to_string(),
682                            &format!("SKETCH '{}' does not exist", name),
683                            "drop",
684                        ));
685                    }
686                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
687                }
688                self.drop_probabilistic_catalog_entry(name)?;
689                self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
690                Ok(RuntimeQueryResult::ok_message(
691                    raw_query.to_string(),
692                    &format!("SKETCH '{}' dropped", name),
693                    "drop",
694                ))
695            }
696
697            // ── Cuckoo Filter ─────────────────────────────────────────
698            ProbabilisticCommand::CreateFilter {
699                name,
700                capacity,
701                if_not_exists,
702            } => {
703                let mut filters = probabilistic_write(
704                    &self.inner.probabilistic.filters,
705                    "probabilistic filter store",
706                );
707                if filters.contains_key(name) {
708                    if *if_not_exists {
709                        return Ok(RuntimeQueryResult::ok_message(
710                            raw_query.to_string(),
711                            &format!("FILTER '{}' already exists", name),
712                            "create",
713                        ));
714                    }
715                    return Err(RedDBError::Query(format!(
716                        "FILTER '{}' already exists",
717                        name
718                    )));
719                }
720                self.create_probabilistic_catalog_entry(
721                    name,
722                    crate::catalog::CollectionModel::Filter,
723                )?;
724                let filter =
725                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
726                self.persist_probabilistic_blob(
727                    PROB_FILTER_STATE_PREFIX,
728                    name,
729                    &filter.as_bytes(),
730                )?;
731                filters.insert(name.clone(), filter);
732                Ok(RuntimeQueryResult::ok_message(
733                    raw_query.to_string(),
734                    &format!("FILTER '{}' created (capacity={})", name, capacity),
735                    "create",
736                ))
737            }
738            ProbabilisticCommand::FilterAdd { name, element } => {
739                let mut filters = probabilistic_write(
740                    &self.inner.probabilistic.filters,
741                    "probabilistic filter store",
742                );
743                let filter = filters
744                    .get_mut(name)
745                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
746                if !filter.insert(element.as_bytes()) {
747                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
748                }
749                self.persist_probabilistic_blob(
750                    PROB_FILTER_STATE_PREFIX,
751                    name,
752                    &filter.as_bytes(),
753                )?;
754                Ok(RuntimeQueryResult::ok_message(
755                    raw_query.to_string(),
756                    &format!("element added to FILTER '{}'", name),
757                    "insert",
758                ))
759            }
760            ProbabilisticCommand::FilterCheck { name, element } => {
761                let filters = probabilistic_read(
762                    &self.inner.probabilistic.filters,
763                    "probabilistic filter store",
764                );
765                let filter = filters
766                    .get(name)
767                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
768                let exists = filter.contains(element.as_bytes());
769                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
770                let mut record = UnifiedRecord::new();
771                record.set("exists", Value::Boolean(exists));
772                result.push(record);
773                Ok(RuntimeQueryResult {
774                    query: raw_query.to_string(),
775                    mode: QueryMode::Sql,
776                    statement: "filter_check",
777                    engine: "runtime-probabilistic",
778                    result,
779                    affected_rows: 0,
780                    statement_type: "select",
781                    bookmark: None,
782                })
783            }
784            ProbabilisticCommand::FilterDelete { name, element } => {
785                let mut filters = probabilistic_write(
786                    &self.inner.probabilistic.filters,
787                    "probabilistic filter store",
788                );
789                let filter = filters
790                    .get_mut(name)
791                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
792                let removed = filter.delete(element.as_bytes());
793                self.persist_probabilistic_blob(
794                    PROB_FILTER_STATE_PREFIX,
795                    name,
796                    &filter.as_bytes(),
797                )?;
798                Ok(RuntimeQueryResult::ok_message(
799                    raw_query.to_string(),
800                    &format!(
801                        "element {} from FILTER '{}'",
802                        if removed { "deleted" } else { "not found in" },
803                        name
804                    ),
805                    "delete",
806                ))
807            }
808            ProbabilisticCommand::FilterCount { name } => {
809                let filters = probabilistic_read(
810                    &self.inner.probabilistic.filters,
811                    "probabilistic filter store",
812                );
813                let filter = filters
814                    .get(name)
815                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
816                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
817                let mut record = UnifiedRecord::new();
818                record.set("count", Value::UnsignedInteger(filter.count() as u64));
819                result.push(record);
820                Ok(RuntimeQueryResult {
821                    query: raw_query.to_string(),
822                    mode: QueryMode::Sql,
823                    statement: "filter_count",
824                    engine: "runtime-probabilistic",
825                    result,
826                    affected_rows: 0,
827                    statement_type: "select",
828                    bookmark: None,
829                })
830            }
831            ProbabilisticCommand::FilterInfo { name } => {
832                let filters = probabilistic_read(
833                    &self.inner.probabilistic.filters,
834                    "probabilistic filter store",
835                );
836                let filter = filters
837                    .get(name)
838                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
839                let mut result = UnifiedResult::with_columns(vec![
840                    "name".into(),
841                    "count".into(),
842                    "load_factor".into(),
843                    "memory_bytes".into(),
844                ]);
845                let mut record = UnifiedRecord::new();
846                record.set("name", Value::text(name.clone()));
847                record.set("count", Value::UnsignedInteger(filter.count() as u64));
848                record.set("load_factor", Value::Float(filter.load_factor()));
849                record.set(
850                    "memory_bytes",
851                    Value::UnsignedInteger(filter.memory_bytes() as u64),
852                );
853                result.push(record);
854                Ok(RuntimeQueryResult {
855                    query: raw_query.to_string(),
856                    mode: QueryMode::Sql,
857                    statement: "filter_info",
858                    engine: "runtime-probabilistic",
859                    result,
860                    affected_rows: 0,
861                    statement_type: "select",
862                    bookmark: None,
863                })
864            }
865            ProbabilisticCommand::DropFilter { name, if_exists } => {
866                let mut filters = probabilistic_write(
867                    &self.inner.probabilistic.filters,
868                    "probabilistic filter store",
869                );
870                if filters.remove(name).is_none() {
871                    if *if_exists {
872                        return Ok(RuntimeQueryResult::ok_message(
873                            raw_query.to_string(),
874                            &format!("FILTER '{}' does not exist", name),
875                            "drop",
876                        ));
877                    }
878                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
879                }
880                self.drop_probabilistic_catalog_entry(name)?;
881                self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
882                Ok(RuntimeQueryResult::ok_message(
883                    raw_query.to_string(),
884                    &format!("FILTER '{}' dropped", name),
885                    "drop",
886                ))
887            }
888        }
889    }
890}
891
892fn parse_probabilistic_read_projection(
893    projection: &Projection,
894    index: usize,
895) -> RedDBResult<Option<ProbabilisticReadProjection>> {
896    if let Some(column) = projection_unqualified_column(projection) {
897        if column.eq_ignore_ascii_case("CARDINALITY") {
898            return Ok(Some(ProbabilisticReadProjection::Cardinality {
899                label: probabilistic_projection_label(projection, "cardinality", index),
900            }));
901        }
902    }
903
904    let Some((function, args)) = projection_function(projection) else {
905        return Ok(None);
906    };
907    if function.eq_ignore_ascii_case("FREQ") {
908        let element = projection_single_text_arg(function, args)?;
909        return Ok(Some(ProbabilisticReadProjection::Freq {
910            element,
911            label: probabilistic_projection_label(projection, "freq", index),
912        }));
913    }
914    if function.eq_ignore_ascii_case("CONTAINS") {
915        let element = projection_single_text_arg(function, args)?;
916        return Ok(Some(ProbabilisticReadProjection::Contains {
917            element,
918            label: probabilistic_projection_label(projection, "contains", index),
919        }));
920    }
921
922    Ok(None)
923}
924
925fn validate_probabilistic_read_model(
926    collection: &str,
927    actual_model: crate::catalog::CollectionModel,
928    projections: &[ProbabilisticReadProjection],
929) -> RedDBResult<()> {
930    for projection in projections {
931        let expected_model = match projection {
932            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
933            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
934            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
935        };
936        if actual_model != expected_model {
937            return Err(RedDBError::Query(format!(
938                "{} is only supported for {} collections; '{}' is {}",
939                probabilistic_projection_form(projection),
940                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
941                collection,
942                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
943            )));
944        }
945    }
946    Ok(())
947}
948
949impl RedDBRuntime {
950    fn materialize_probabilistic_select_row(
951        &self,
952        collection: &str,
953        projections: &[ProbabilisticReadProjection],
954    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
955        let mut columns = Vec::with_capacity(projections.len());
956        let mut record = UnifiedRecord::new();
957        for projection in projections {
958            match projection {
959                ProbabilisticReadProjection::Cardinality { label } => {
960                    let hlls = probabilistic_read(
961                        &self.inner.probabilistic.hlls,
962                        "probabilistic HLL store",
963                    );
964                    let hll = hlls.get(collection).ok_or_else(|| {
965                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
966                    })?;
967                    columns.push(label.clone());
968                    record.set(label, Value::UnsignedInteger(hll.count()));
969                }
970                ProbabilisticReadProjection::Freq { element, label } => {
971                    let sketches = probabilistic_read(
972                        &self.inner.probabilistic.sketches,
973                        "probabilistic sketch store",
974                    );
975                    let sketch = sketches.get(collection).ok_or_else(|| {
976                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
977                    })?;
978                    columns.push(label.clone());
979                    record.set(
980                        label,
981                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
982                    );
983                }
984                ProbabilisticReadProjection::Contains { element, label } => {
985                    let filters = probabilistic_read(
986                        &self.inner.probabilistic.filters,
987                        "probabilistic filter store",
988                    );
989                    let filter = filters.get(collection).ok_or_else(|| {
990                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
991                    })?;
992                    columns.push(label.clone());
993                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
994                }
995            }
996        }
997        Ok((columns, record))
998    }
999}
1000
1001fn probabilistic_select_row_visible(
1002    runtime: &RedDBRuntime,
1003    query: &TableQuery,
1004    record: &UnifiedRecord,
1005) -> bool {
1006    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1007        return false;
1008    }
1009    let table_name = query.table.as_str();
1010    let table_alias = query.alias.as_deref().unwrap_or(table_name);
1011    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1012        super::join_filter::evaluate_runtime_filter_with_db(
1013            Some(&runtime.inner.db),
1014            record,
1015            &filter,
1016            Some(table_name),
1017            Some(table_alias),
1018        )
1019    })
1020}
1021
1022fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1023    match projection {
1024        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1025            Some(column.as_str())
1026        }
1027        Projection::Column(column) => Some(column.as_str()),
1028        Projection::Alias(column, _) => Some(column.as_str()),
1029        _ => None,
1030    }
1031}
1032
1033fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1034    match projection {
1035        Projection::Function(name, args) => {
1036            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1037            Some((function, args.as_slice()))
1038        }
1039        _ => None,
1040    }
1041}
1042
1043fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1044    if args.len() != 1 {
1045        return Err(RedDBError::Query(format!(
1046            "{function}(...) expects exactly one string literal"
1047        )));
1048    }
1049    match &args[0] {
1050        Projection::Column(column) => column
1051            .strip_prefix("LIT:")
1052            .map(ToString::to_string)
1053            .ok_or_else(|| {
1054                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1055            }),
1056        _ => Err(RedDBError::Query(format!(
1057            "{function}(...) expects a string literal argument"
1058        ))),
1059    }
1060}
1061
1062fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1063    match projection {
1064        Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1065            if alias.eq_ignore_ascii_case(column) =>
1066        {
1067            numbered_probabilistic_label(base, index)
1068        }
1069        Projection::Field(_, Some(alias)) => alias.clone(),
1070        Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1071            numbered_probabilistic_label(base, index)
1072        }
1073        Projection::Alias(_, alias) => alias.clone(),
1074        Projection::Function(name, _) => name
1075            .split_once(':')
1076            .map(|(_, alias)| {
1077                if is_generated_probabilistic_function_label(alias, base) {
1078                    numbered_probabilistic_label(base, index)
1079                } else {
1080                    alias.to_string()
1081                }
1082            })
1083            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1084        _ => numbered_probabilistic_label(base, index),
1085    }
1086}
1087
1088fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1089    alias
1090        .get(..base.len())
1091        .is_some_and(|head| head.eq_ignore_ascii_case(base))
1092        && alias[base.len()..].starts_with('(')
1093}
1094
1095fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1096    if index == 0 {
1097        base.to_string()
1098    } else {
1099        format!("{base}_{}", index + 1)
1100    }
1101}
1102
1103fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1104    match projection {
1105        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1106        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1107        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1108    }
1109}