Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11    lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15    lock.write()
16}
17
18fn probabilistic_collection_contract(
19    name: &str,
20    model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22    let now = crate::utils::now_unix_millis() as u128;
23    crate::physical::CollectionContract {
24        name: name.to_string(),
25        declared_model: model,
26        schema_mode: crate::catalog::SchemaMode::Dynamic,
27        origin: crate::physical::ContractOrigin::Explicit,
28        version: 1,
29        created_at_unix_ms: now,
30        updated_at_unix_ms: now,
31        default_ttl_ms: None,
32        vector_dimension: None,
33        vector_metric: None,
34        context_index_fields: Vec::new(),
35        declared_columns: Vec::new(),
36        table_def: None,
37        timestamps_enabled: false,
38        context_index_enabled: false,
39        metrics_raw_retention_ms: None,
40        metrics_rollup_policies: Vec::new(),
41        metrics_tenant_identity: None,
42        metrics_namespace: None,
43        append_only: false,
44        subscriptions: Vec::new(),
45        analytics_config: Vec::new(),
46        session_key: None,
47        session_gap_ms: None,
48        retention_duration_ms: None,
49        analytical_storage: None,
50
51        ai_policy: None,
52    }
53}
54
55enum ProbabilisticReadProjection {
56    Cardinality { label: String },
57    Freq { element: String, label: String },
58    Contains { element: String, label: String },
59}
60
61impl RedDBRuntime {
62    pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
63        {
64            let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
65            let mut hlls =
66                probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
67            for (name, data_hex) in entries {
68                let bytes = hex::decode(&data_hex).map_err(|err| {
69                    RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
70                })?;
71                let Some(hll) =
72                    crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
73                else {
74                    return Err(RedDBError::Internal(format!(
75                        "invalid persisted HLL state for '{name}'"
76                    )));
77                };
78                hlls.insert(name, hll);
79            }
80        }
81
82        {
83            let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
84            let mut sketches = probabilistic_write(
85                &self.inner.probabilistic.sketches,
86                "probabilistic sketch store",
87            );
88            for (name, data_hex) in entries {
89                let bytes = hex::decode(&data_hex).map_err(|err| {
90                    RedDBError::Internal(format!(
91                        "invalid persisted SKETCH state for '{name}': {err}"
92                    ))
93                })?;
94                let sketch =
95                    crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
96                        &bytes,
97                    )
98                    .ok_or_else(|| {
99                        RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
100                    })?;
101                sketches.insert(name, sketch);
102            }
103        }
104
105        {
106            let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
107            let mut filters = probabilistic_write(
108                &self.inner.probabilistic.filters,
109                "probabilistic filter store",
110            );
111            for (name, data_hex) in entries {
112                let bytes = hex::decode(&data_hex).map_err(|err| {
113                    RedDBError::Internal(format!(
114                        "invalid persisted FILTER state for '{name}': {err}"
115                    ))
116                })?;
117                let filter =
118                    crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
119                        .ok_or_else(|| {
120                            RedDBError::Internal(format!(
121                                "invalid persisted FILTER state for '{name}'"
122                            ))
123                        })?;
124                filters.insert(name, filter);
125            }
126        }
127
128        Ok(())
129    }
130
131    fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
132        let Some(manager) = self.inner.db.store().get_collection("red_config") else {
133            return Vec::new();
134        };
135        let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
136            std::collections::HashMap::new();
137        for entity in manager.query_all(|_| true) {
138            let EntityData::Row(row) = &entity.data else {
139                continue;
140            };
141            let Some(named) = &row.named else {
142                continue;
143            };
144            let Some(Value::Text(key)) = named.get("key") else {
145                continue;
146            };
147            let Some(encoded_name) = key.strip_prefix(prefix) else {
148                continue;
149            };
150            let value = match named.get("value") {
151                Some(Value::Text(value)) => Some(value.to_string()),
152                Some(Value::Null) => None,
153                _ => continue,
154            };
155            let entity_id = entity.id.raw();
156            match latest.get(encoded_name) {
157                Some((existing_id, _)) if *existing_id > entity_id => {}
158                _ => {
159                    latest.insert(encoded_name.to_string(), (entity_id, value));
160                }
161            }
162        }
163
164        latest
165            .into_iter()
166            .filter_map(|(encoded_name, (_, value))| {
167                let value = value?;
168                let bytes = hex::decode(encoded_name).ok()?;
169                let name = String::from_utf8(bytes).ok()?;
170                Some((name, value))
171            })
172            .collect()
173    }
174
175    fn persist_probabilistic_blob(
176        &self,
177        prefix: &str,
178        name: &str,
179        bytes: &[u8],
180    ) -> RedDBResult<()> {
181        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
182        self.inner
183            .db
184            .store()
185            .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
186        Ok(())
187    }
188
189    fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
190        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
191        self.inner
192            .db
193            .store()
194            .set_config_tree(&key, &crate::serde_json::Value::Null);
195        Ok(())
196    }
197
198    fn create_probabilistic_catalog_entry(
199        &self,
200        name: &str,
201        model: crate::catalog::CollectionModel,
202    ) -> RedDBResult<()> {
203        let store = self.inner.db.store();
204        store
205            .create_collection(name)
206            .map_err(|err| RedDBError::Internal(err.to_string()))?;
207        self.inner
208            .db
209            .save_collection_contract(probabilistic_collection_contract(name, model))
210            .map_err(|err| RedDBError::Internal(err.to_string()))?;
211        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
212            store.set_config_tree(
213                &format!("red.collection_tenants.{name}"),
214                &crate::serde_json::Value::String(tenant_id),
215            );
216        }
217        self.inner
218            .db
219            .persist_metadata()
220            .map_err(|err| RedDBError::Internal(err.to_string()))?;
221        self.invalidate_result_cache();
222        Ok(())
223    }
224
225    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
226        let store = self.inner.db.store();
227        if store.get_collection(name).is_some() {
228            store
229                .drop_collection(name)
230                .map_err(|err| RedDBError::Internal(err.to_string()))?;
231        }
232        self.inner
233            .db
234            .remove_collection_contract(name)
235            .map_err(|err| RedDBError::Internal(err.to_string()))?;
236        self.inner
237            .db
238            .persist_metadata()
239            .map_err(|err| RedDBError::Internal(err.to_string()))?;
240        self.invalidate_result_cache();
241        Ok(())
242    }
243
244    pub(crate) fn execute_probabilistic_select(
245        &self,
246        query: &TableQuery,
247    ) -> RedDBResult<Option<UnifiedResult>> {
248        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
249        let mut read_projections = Vec::new();
250        for projection in &projections {
251            if let Some(read_projection) =
252                parse_probabilistic_read_projection(projection, read_projections.len())?
253            {
254                read_projections.push(read_projection);
255            }
256        }
257
258        let Some(actual_model) = self
259            .inner
260            .db
261            .collection_contract(&query.table)
262            .map(|contract| contract.declared_model)
263        else {
264            return if read_projections.is_empty() {
265                Ok(None)
266            } else {
267                Err(RedDBError::NotFound(format!(
268                    "probabilistic collection '{}' not found",
269                    query.table
270                )))
271            };
272        };
273
274        let is_probabilistic_model = matches!(
275            actual_model,
276            crate::catalog::CollectionModel::Hll
277                | crate::catalog::CollectionModel::Sketch
278                | crate::catalog::CollectionModel::Filter
279        );
280        if read_projections.is_empty() {
281            return if is_probabilistic_model {
282                Err(RedDBError::Query(format!(
283                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
284                    query.table
285                )))
286            } else {
287                Ok(None)
288            };
289        }
290
291        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
292        let (columns, record) =
293            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
294        let mut result = UnifiedResult::with_columns(columns);
295        if probabilistic_select_row_visible(self, query, &record) {
296            result.push(record);
297        }
298        Ok(Some(result))
299    }
300
301    pub fn execute_probabilistic_command(
302        &self,
303        raw_query: &str,
304        cmd: &ProbabilisticCommand,
305    ) -> RedDBResult<RuntimeQueryResult> {
306        // Mixed read/write surface: count/info/check are read-side and
307        // must remain available on read-only replicas; create/add/
308        // merge/delete/drop are mutations and must go through the gate.
309        let is_mutation = matches!(
310            cmd,
311            ProbabilisticCommand::CreateHll { .. }
312                | ProbabilisticCommand::HllAdd { .. }
313                | ProbabilisticCommand::HllMerge { .. }
314                | ProbabilisticCommand::DropHll { .. }
315                | ProbabilisticCommand::CreateSketch { .. }
316                | ProbabilisticCommand::SketchAdd { .. }
317                | ProbabilisticCommand::SketchMerge { .. }
318                | ProbabilisticCommand::DropSketch { .. }
319                | ProbabilisticCommand::CreateFilter { .. }
320                | ProbabilisticCommand::FilterAdd { .. }
321                | ProbabilisticCommand::FilterDelete { .. }
322                | ProbabilisticCommand::DropFilter { .. }
323        );
324        if is_mutation {
325            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
326        }
327        match cmd {
328            // ── HyperLogLog ──────────────────────────────────────────
329            ProbabilisticCommand::CreateHll {
330                name,
331                precision,
332                if_not_exists,
333            } => {
334                let mut hlls =
335                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
336                if hlls.contains_key(name) {
337                    if *if_not_exists {
338                        return Ok(RuntimeQueryResult::ok_message(
339                            raw_query.to_string(),
340                            &format!("HLL '{}' already exists", name),
341                            "create",
342                        ));
343                    }
344                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
345                }
346                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
347                    *precision,
348                )
349                .ok_or_else(|| {
350                    RedDBError::Query(format!(
351                        "HLL precision must be between 4 and 18, got {precision}"
352                    ))
353                })?;
354                self.create_probabilistic_catalog_entry(
355                    name,
356                    crate::catalog::CollectionModel::Hll,
357                )?;
358                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
359                hlls.insert(name.clone(), hll);
360                Ok(RuntimeQueryResult::ok_message(
361                    raw_query.to_string(),
362                    &format!("HLL '{}' created", name),
363                    "create",
364                ))
365            }
366            ProbabilisticCommand::HllAdd { name, elements } => {
367                let mut hlls =
368                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
369                let hll = hlls
370                    .get_mut(name)
371                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
372                for elem in elements {
373                    hll.add(elem.as_bytes());
374                }
375                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
376                Ok(RuntimeQueryResult::ok_message(
377                    raw_query.to_string(),
378                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
379                    "insert",
380                ))
381            }
382            ProbabilisticCommand::HllCount { names } => {
383                let hlls =
384                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
385                if names.len() == 1 {
386                    let hll = hlls.get(&names[0]).ok_or_else(|| {
387                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
388                    })?;
389                    let count = hll.count();
390                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
391                    let mut record = UnifiedRecord::new();
392                    record.set("count", Value::UnsignedInteger(count));
393                    result.push(record);
394                    Ok(RuntimeQueryResult {
395                        query: raw_query.to_string(),
396                        mode: QueryMode::Sql,
397                        statement: "hll_count",
398                        engine: "runtime-probabilistic",
399                        result,
400                        affected_rows: 0,
401                        statement_type: "select",
402                        bookmark: None,
403                    })
404                } else {
405                    // Multi-HLL count = union count
406                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
407                    for name in names {
408                        let hll = hlls.get(name).ok_or_else(|| {
409                            RedDBError::NotFound(format!("HLL '{}' not found", name))
410                        })?;
411                        merged.merge(hll);
412                    }
413                    let count = merged.count();
414                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
415                    let mut record = UnifiedRecord::new();
416                    record.set("count", Value::UnsignedInteger(count));
417                    result.push(record);
418                    Ok(RuntimeQueryResult {
419                        query: raw_query.to_string(),
420                        mode: QueryMode::Sql,
421                        statement: "hll_count",
422                        engine: "runtime-probabilistic",
423                        result,
424                        affected_rows: 0,
425                        statement_type: "select",
426                        bookmark: None,
427                    })
428                }
429            }
430            ProbabilisticCommand::HllMerge { dest, sources } => {
431                let mut hlls =
432                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
433                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
434                for src in sources {
435                    let hll = hlls
436                        .get(src)
437                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
438                    merged.merge(hll);
439                }
440                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
441                hlls.insert(dest.clone(), merged);
442                Ok(RuntimeQueryResult::ok_message(
443                    raw_query.to_string(),
444                    &format!(
445                        "HLL '{}' created from merge of {}",
446                        dest,
447                        sources.join(", ")
448                    ),
449                    "create",
450                ))
451            }
452            ProbabilisticCommand::HllInfo { name } => {
453                let hlls =
454                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
455                let hll = hlls
456                    .get(name)
457                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
458                let mut result = UnifiedResult::with_columns(vec![
459                    "name".into(),
460                    "precision".into(),
461                    "count".into(),
462                    "memory_bytes".into(),
463                ]);
464                let mut record = UnifiedRecord::new();
465                record.set("name", Value::text(name.clone()));
466                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
467                record.set("count", Value::UnsignedInteger(hll.count()));
468                record.set(
469                    "memory_bytes",
470                    Value::UnsignedInteger(hll.memory_bytes() as u64),
471                );
472                result.push(record);
473                Ok(RuntimeQueryResult {
474                    query: raw_query.to_string(),
475                    mode: QueryMode::Sql,
476                    statement: "hll_info",
477                    engine: "runtime-probabilistic",
478                    result,
479                    affected_rows: 0,
480                    statement_type: "select",
481                    bookmark: None,
482                })
483            }
484            ProbabilisticCommand::DropHll { name, if_exists } => {
485                let mut hlls =
486                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
487                if hlls.remove(name).is_none() {
488                    if *if_exists {
489                        return Ok(RuntimeQueryResult::ok_message(
490                            raw_query.to_string(),
491                            &format!("HLL '{}' does not exist", name),
492                            "drop",
493                        ));
494                    }
495                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
496                }
497                self.drop_probabilistic_catalog_entry(name)?;
498                self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
499                Ok(RuntimeQueryResult::ok_message(
500                    raw_query.to_string(),
501                    &format!("HLL '{}' dropped", name),
502                    "drop",
503                ))
504            }
505
506            // ── Count-Min Sketch ───────────────────────────────────────
507            ProbabilisticCommand::CreateSketch {
508                name,
509                width,
510                depth,
511                if_not_exists,
512            } => {
513                let mut sketches = probabilistic_write(
514                    &self.inner.probabilistic.sketches,
515                    "probabilistic sketch store",
516                );
517                if sketches.contains_key(name) {
518                    if *if_not_exists {
519                        return Ok(RuntimeQueryResult::ok_message(
520                            raw_query.to_string(),
521                            &format!("SKETCH '{}' already exists", name),
522                            "create",
523                        ));
524                    }
525                    return Err(RedDBError::Query(format!(
526                        "SKETCH '{}' already exists",
527                        name
528                    )));
529                }
530                self.create_probabilistic_catalog_entry(
531                    name,
532                    crate::catalog::CollectionModel::Sketch,
533                )?;
534                let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
535                    *width, *depth,
536                );
537                self.persist_probabilistic_blob(
538                    PROB_SKETCH_STATE_PREFIX,
539                    name,
540                    &sketch.as_bytes(),
541                )?;
542                sketches.insert(name.clone(), sketch);
543                Ok(RuntimeQueryResult::ok_message(
544                    raw_query.to_string(),
545                    &format!(
546                        "SKETCH '{}' created (width={}, depth={})",
547                        name, width, depth
548                    ),
549                    "create",
550                ))
551            }
552            ProbabilisticCommand::SketchAdd {
553                name,
554                element,
555                count,
556            } => {
557                let mut sketches = probabilistic_write(
558                    &self.inner.probabilistic.sketches,
559                    "probabilistic sketch store",
560                );
561                let sketch = sketches
562                    .get_mut(name)
563                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
564                sketch.add(element.as_bytes(), *count);
565                self.persist_probabilistic_blob(
566                    PROB_SKETCH_STATE_PREFIX,
567                    name,
568                    &sketch.as_bytes(),
569                )?;
570                Ok(RuntimeQueryResult::ok_message(
571                    raw_query.to_string(),
572                    &format!("added {} to SKETCH '{}'", count, name),
573                    "insert",
574                ))
575            }
576            ProbabilisticCommand::SketchCount { name, element } => {
577                let sketches = probabilistic_read(
578                    &self.inner.probabilistic.sketches,
579                    "probabilistic sketch store",
580                );
581                let sketch = sketches
582                    .get(name)
583                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
584                let estimate = sketch.estimate(element.as_bytes());
585                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
586                let mut record = UnifiedRecord::new();
587                record.set("estimate", Value::UnsignedInteger(estimate));
588                result.push(record);
589                Ok(RuntimeQueryResult {
590                    query: raw_query.to_string(),
591                    mode: QueryMode::Sql,
592                    statement: "sketch_count",
593                    engine: "runtime-probabilistic",
594                    result,
595                    affected_rows: 0,
596                    statement_type: "select",
597                    bookmark: None,
598                })
599            }
600            ProbabilisticCommand::SketchMerge { dest, sources } => {
601                let mut sketches = probabilistic_write(
602                    &self.inner.probabilistic.sketches,
603                    "probabilistic sketch store",
604                );
605                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
606                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
607                })?;
608                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
609                    first_src.width(),
610                    first_src.depth(),
611                );
612                for src in sources {
613                    let sketch = sketches.get(src).ok_or_else(|| {
614                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
615                    })?;
616                    if !merged.merge(sketch) {
617                        return Err(RedDBError::Query(format!(
618                            "SKETCH '{}' has incompatible dimensions",
619                            src
620                        )));
621                    }
622                }
623                self.persist_probabilistic_blob(
624                    PROB_SKETCH_STATE_PREFIX,
625                    dest,
626                    &merged.as_bytes(),
627                )?;
628                sketches.insert(dest.clone(), merged);
629                Ok(RuntimeQueryResult::ok_message(
630                    raw_query.to_string(),
631                    &format!(
632                        "SKETCH '{}' created from merge of {}",
633                        dest,
634                        sources.join(", ")
635                    ),
636                    "create",
637                ))
638            }
639            ProbabilisticCommand::SketchInfo { name } => {
640                let sketches = probabilistic_read(
641                    &self.inner.probabilistic.sketches,
642                    "probabilistic sketch store",
643                );
644                let sketch = sketches
645                    .get(name)
646                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
647                let mut result = UnifiedResult::with_columns(vec![
648                    "name".into(),
649                    "width".into(),
650                    "depth".into(),
651                    "total".into(),
652                    "memory_bytes".into(),
653                ]);
654                let mut record = UnifiedRecord::new();
655                record.set("name", Value::text(name.clone()));
656                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
657                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
658                record.set("total", Value::UnsignedInteger(sketch.total()));
659                record.set(
660                    "memory_bytes",
661                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
662                );
663                result.push(record);
664                Ok(RuntimeQueryResult {
665                    query: raw_query.to_string(),
666                    mode: QueryMode::Sql,
667                    statement: "sketch_info",
668                    engine: "runtime-probabilistic",
669                    result,
670                    affected_rows: 0,
671                    statement_type: "select",
672                    bookmark: None,
673                })
674            }
675            ProbabilisticCommand::DropSketch { name, if_exists } => {
676                let mut sketches = probabilistic_write(
677                    &self.inner.probabilistic.sketches,
678                    "probabilistic sketch store",
679                );
680                if sketches.remove(name).is_none() {
681                    if *if_exists {
682                        return Ok(RuntimeQueryResult::ok_message(
683                            raw_query.to_string(),
684                            &format!("SKETCH '{}' does not exist", name),
685                            "drop",
686                        ));
687                    }
688                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
689                }
690                self.drop_probabilistic_catalog_entry(name)?;
691                self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
692                Ok(RuntimeQueryResult::ok_message(
693                    raw_query.to_string(),
694                    &format!("SKETCH '{}' dropped", name),
695                    "drop",
696                ))
697            }
698
699            // ── Cuckoo Filter ─────────────────────────────────────────
700            ProbabilisticCommand::CreateFilter {
701                name,
702                capacity,
703                if_not_exists,
704            } => {
705                let mut filters = probabilistic_write(
706                    &self.inner.probabilistic.filters,
707                    "probabilistic filter store",
708                );
709                if filters.contains_key(name) {
710                    if *if_not_exists {
711                        return Ok(RuntimeQueryResult::ok_message(
712                            raw_query.to_string(),
713                            &format!("FILTER '{}' already exists", name),
714                            "create",
715                        ));
716                    }
717                    return Err(RedDBError::Query(format!(
718                        "FILTER '{}' already exists",
719                        name
720                    )));
721                }
722                self.create_probabilistic_catalog_entry(
723                    name,
724                    crate::catalog::CollectionModel::Filter,
725                )?;
726                let filter =
727                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
728                self.persist_probabilistic_blob(
729                    PROB_FILTER_STATE_PREFIX,
730                    name,
731                    &filter.as_bytes(),
732                )?;
733                filters.insert(name.clone(), filter);
734                Ok(RuntimeQueryResult::ok_message(
735                    raw_query.to_string(),
736                    &format!("FILTER '{}' created (capacity={})", name, capacity),
737                    "create",
738                ))
739            }
740            ProbabilisticCommand::FilterAdd { name, element } => {
741                let mut filters = probabilistic_write(
742                    &self.inner.probabilistic.filters,
743                    "probabilistic filter store",
744                );
745                let filter = filters
746                    .get_mut(name)
747                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
748                if !filter.insert(element.as_bytes()) {
749                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
750                }
751                self.persist_probabilistic_blob(
752                    PROB_FILTER_STATE_PREFIX,
753                    name,
754                    &filter.as_bytes(),
755                )?;
756                Ok(RuntimeQueryResult::ok_message(
757                    raw_query.to_string(),
758                    &format!("element added to FILTER '{}'", name),
759                    "insert",
760                ))
761            }
762            ProbabilisticCommand::FilterCheck { name, element } => {
763                let filters = probabilistic_read(
764                    &self.inner.probabilistic.filters,
765                    "probabilistic filter store",
766                );
767                let filter = filters
768                    .get(name)
769                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
770                let exists = filter.contains(element.as_bytes());
771                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
772                let mut record = UnifiedRecord::new();
773                record.set("exists", Value::Boolean(exists));
774                result.push(record);
775                Ok(RuntimeQueryResult {
776                    query: raw_query.to_string(),
777                    mode: QueryMode::Sql,
778                    statement: "filter_check",
779                    engine: "runtime-probabilistic",
780                    result,
781                    affected_rows: 0,
782                    statement_type: "select",
783                    bookmark: None,
784                })
785            }
786            ProbabilisticCommand::FilterDelete { name, element } => {
787                let mut filters = probabilistic_write(
788                    &self.inner.probabilistic.filters,
789                    "probabilistic filter store",
790                );
791                let filter = filters
792                    .get_mut(name)
793                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
794                let removed = filter.delete(element.as_bytes());
795                self.persist_probabilistic_blob(
796                    PROB_FILTER_STATE_PREFIX,
797                    name,
798                    &filter.as_bytes(),
799                )?;
800                Ok(RuntimeQueryResult::ok_message(
801                    raw_query.to_string(),
802                    &format!(
803                        "element {} from FILTER '{}'",
804                        if removed { "deleted" } else { "not found in" },
805                        name
806                    ),
807                    "delete",
808                ))
809            }
810            ProbabilisticCommand::FilterCount { name } => {
811                let filters = probabilistic_read(
812                    &self.inner.probabilistic.filters,
813                    "probabilistic filter store",
814                );
815                let filter = filters
816                    .get(name)
817                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
818                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
819                let mut record = UnifiedRecord::new();
820                record.set("count", Value::UnsignedInteger(filter.count() as u64));
821                result.push(record);
822                Ok(RuntimeQueryResult {
823                    query: raw_query.to_string(),
824                    mode: QueryMode::Sql,
825                    statement: "filter_count",
826                    engine: "runtime-probabilistic",
827                    result,
828                    affected_rows: 0,
829                    statement_type: "select",
830                    bookmark: None,
831                })
832            }
833            ProbabilisticCommand::FilterInfo { name } => {
834                let filters = probabilistic_read(
835                    &self.inner.probabilistic.filters,
836                    "probabilistic filter store",
837                );
838                let filter = filters
839                    .get(name)
840                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
841                let mut result = UnifiedResult::with_columns(vec![
842                    "name".into(),
843                    "count".into(),
844                    "load_factor".into(),
845                    "memory_bytes".into(),
846                ]);
847                let mut record = UnifiedRecord::new();
848                record.set("name", Value::text(name.clone()));
849                record.set("count", Value::UnsignedInteger(filter.count() as u64));
850                record.set("load_factor", Value::Float(filter.load_factor()));
851                record.set(
852                    "memory_bytes",
853                    Value::UnsignedInteger(filter.memory_bytes() as u64),
854                );
855                result.push(record);
856                Ok(RuntimeQueryResult {
857                    query: raw_query.to_string(),
858                    mode: QueryMode::Sql,
859                    statement: "filter_info",
860                    engine: "runtime-probabilistic",
861                    result,
862                    affected_rows: 0,
863                    statement_type: "select",
864                    bookmark: None,
865                })
866            }
867            ProbabilisticCommand::DropFilter { name, if_exists } => {
868                let mut filters = probabilistic_write(
869                    &self.inner.probabilistic.filters,
870                    "probabilistic filter store",
871                );
872                if filters.remove(name).is_none() {
873                    if *if_exists {
874                        return Ok(RuntimeQueryResult::ok_message(
875                            raw_query.to_string(),
876                            &format!("FILTER '{}' does not exist", name),
877                            "drop",
878                        ));
879                    }
880                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
881                }
882                self.drop_probabilistic_catalog_entry(name)?;
883                self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
884                Ok(RuntimeQueryResult::ok_message(
885                    raw_query.to_string(),
886                    &format!("FILTER '{}' dropped", name),
887                    "drop",
888                ))
889            }
890        }
891    }
892}
893
894fn parse_probabilistic_read_projection(
895    projection: &Projection,
896    index: usize,
897) -> RedDBResult<Option<ProbabilisticReadProjection>> {
898    if let Some(column) = projection_unqualified_column(projection) {
899        if column.eq_ignore_ascii_case("CARDINALITY") {
900            return Ok(Some(ProbabilisticReadProjection::Cardinality {
901                label: probabilistic_projection_label(projection, "cardinality", index),
902            }));
903        }
904    }
905
906    let Some((function, args)) = projection_function(projection) else {
907        return Ok(None);
908    };
909    if function.eq_ignore_ascii_case("FREQ") {
910        let element = projection_single_text_arg(function, args)?;
911        return Ok(Some(ProbabilisticReadProjection::Freq {
912            element,
913            label: probabilistic_projection_label(projection, "freq", index),
914        }));
915    }
916    if function.eq_ignore_ascii_case("CONTAINS") {
917        let element = projection_single_text_arg(function, args)?;
918        return Ok(Some(ProbabilisticReadProjection::Contains {
919            element,
920            label: probabilistic_projection_label(projection, "contains", index),
921        }));
922    }
923
924    Ok(None)
925}
926
927fn validate_probabilistic_read_model(
928    collection: &str,
929    actual_model: crate::catalog::CollectionModel,
930    projections: &[ProbabilisticReadProjection],
931) -> RedDBResult<()> {
932    for projection in projections {
933        let expected_model = match projection {
934            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
935            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
936            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
937        };
938        if actual_model != expected_model {
939            return Err(RedDBError::Query(format!(
940                "{} is only supported for {} collections; '{}' is {}",
941                probabilistic_projection_form(projection),
942                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
943                collection,
944                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
945            )));
946        }
947    }
948    Ok(())
949}
950
951impl RedDBRuntime {
952    fn materialize_probabilistic_select_row(
953        &self,
954        collection: &str,
955        projections: &[ProbabilisticReadProjection],
956    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
957        let mut columns = Vec::with_capacity(projections.len());
958        let mut record = UnifiedRecord::new();
959        for projection in projections {
960            match projection {
961                ProbabilisticReadProjection::Cardinality { label } => {
962                    let hlls = probabilistic_read(
963                        &self.inner.probabilistic.hlls,
964                        "probabilistic HLL store",
965                    );
966                    let hll = hlls.get(collection).ok_or_else(|| {
967                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
968                    })?;
969                    columns.push(label.clone());
970                    record.set(label, Value::UnsignedInteger(hll.count()));
971                }
972                ProbabilisticReadProjection::Freq { element, label } => {
973                    let sketches = probabilistic_read(
974                        &self.inner.probabilistic.sketches,
975                        "probabilistic sketch store",
976                    );
977                    let sketch = sketches.get(collection).ok_or_else(|| {
978                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
979                    })?;
980                    columns.push(label.clone());
981                    record.set(
982                        label,
983                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
984                    );
985                }
986                ProbabilisticReadProjection::Contains { element, label } => {
987                    let filters = probabilistic_read(
988                        &self.inner.probabilistic.filters,
989                        "probabilistic filter store",
990                    );
991                    let filter = filters.get(collection).ok_or_else(|| {
992                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
993                    })?;
994                    columns.push(label.clone());
995                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
996                }
997            }
998        }
999        Ok((columns, record))
1000    }
1001}
1002
1003fn probabilistic_select_row_visible(
1004    runtime: &RedDBRuntime,
1005    query: &TableQuery,
1006    record: &UnifiedRecord,
1007) -> bool {
1008    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1009        return false;
1010    }
1011    let table_name = query.table.as_str();
1012    let table_alias = query.alias.as_deref().unwrap_or(table_name);
1013    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1014        super::join_filter::evaluate_runtime_filter_with_db(
1015            Some(&runtime.inner.db),
1016            record,
1017            &filter,
1018            Some(table_name),
1019            Some(table_alias),
1020        )
1021    })
1022}
1023
1024fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1025    match projection {
1026        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1027            Some(column.as_str())
1028        }
1029        Projection::Column(column) => Some(column.as_str()),
1030        Projection::Alias(column, _) => Some(column.as_str()),
1031        _ => None,
1032    }
1033}
1034
1035fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1036    match projection {
1037        Projection::Function(name, args) => {
1038            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1039            Some((function, args.as_slice()))
1040        }
1041        _ => None,
1042    }
1043}
1044
1045fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1046    if args.len() != 1 {
1047        return Err(RedDBError::Query(format!(
1048            "{function}(...) expects exactly one string literal"
1049        )));
1050    }
1051    match &args[0] {
1052        Projection::Column(column) => column
1053            .strip_prefix("LIT:")
1054            .map(ToString::to_string)
1055            .ok_or_else(|| {
1056                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1057            }),
1058        _ => Err(RedDBError::Query(format!(
1059            "{function}(...) expects a string literal argument"
1060        ))),
1061    }
1062}
1063
1064fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1065    match projection {
1066        Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1067            if alias.eq_ignore_ascii_case(column) =>
1068        {
1069            numbered_probabilistic_label(base, index)
1070        }
1071        Projection::Field(_, Some(alias)) => alias.clone(),
1072        Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1073            numbered_probabilistic_label(base, index)
1074        }
1075        Projection::Alias(_, alias) => alias.clone(),
1076        Projection::Function(name, _) => name
1077            .split_once(':')
1078            .map(|(_, alias)| {
1079                if is_generated_probabilistic_function_label(alias, base) {
1080                    numbered_probabilistic_label(base, index)
1081                } else {
1082                    alias.to_string()
1083                }
1084            })
1085            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1086        _ => numbered_probabilistic_label(base, index),
1087    }
1088}
1089
1090fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1091    alias
1092        .get(..base.len())
1093        .is_some_and(|head| head.eq_ignore_ascii_case(base))
1094        && alias[base.len()..].starts_with('(')
1095}
1096
1097fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1098    if index == 0 {
1099        base.to_string()
1100    } else {
1101        format!("{base}_{}", index + 1)
1102    }
1103}
1104
1105fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1106    match projection {
1107        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1108        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1109        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1110    }
1111}