Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11    lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15    lock.write()
16}
17
18fn probabilistic_collection_contract(
19    name: &str,
20    model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22    let now = crate::utils::now_unix_millis() as u128;
23    crate::physical::CollectionContract {
24        name: name.to_string(),
25        declared_model: model,
26        schema_mode: crate::catalog::SchemaMode::Dynamic,
27        origin: crate::physical::ContractOrigin::Explicit,
28        version: 1,
29        created_at_unix_ms: now,
30        updated_at_unix_ms: now,
31        default_ttl_ms: None,
32        vector_dimension: None,
33        vector_metric: None,
34        context_index_fields: Vec::new(),
35        declared_columns: Vec::new(),
36        table_def: None,
37        timestamps_enabled: false,
38        context_index_enabled: false,
39        metrics_raw_retention_ms: None,
40        metrics_rollup_policies: Vec::new(),
41        metrics_tenant_identity: None,
42        metrics_namespace: None,
43        append_only: false,
44        subscriptions: Vec::new(),
45        analytics_config: Vec::new(),
46        session_key: None,
47        session_gap_ms: None,
48        retention_duration_ms: None,
49    }
50}
51
52enum ProbabilisticReadProjection {
53    Cardinality { label: String },
54    Freq { element: String, label: String },
55    Contains { element: String, label: String },
56}
57
58impl RedDBRuntime {
59    pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
60        {
61            let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
62            let mut hlls =
63                probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
64            for (name, data_hex) in entries {
65                let bytes = hex::decode(&data_hex).map_err(|err| {
66                    RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
67                })?;
68                let Some(hll) =
69                    crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
70                else {
71                    return Err(RedDBError::Internal(format!(
72                        "invalid persisted HLL state for '{name}'"
73                    )));
74                };
75                hlls.insert(name, hll);
76            }
77        }
78
79        {
80            let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
81            let mut sketches = probabilistic_write(
82                &self.inner.probabilistic.sketches,
83                "probabilistic sketch store",
84            );
85            for (name, data_hex) in entries {
86                let bytes = hex::decode(&data_hex).map_err(|err| {
87                    RedDBError::Internal(format!(
88                        "invalid persisted SKETCH state for '{name}': {err}"
89                    ))
90                })?;
91                let sketch =
92                    crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
93                        &bytes,
94                    )
95                    .ok_or_else(|| {
96                        RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
97                    })?;
98                sketches.insert(name, sketch);
99            }
100        }
101
102        {
103            let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
104            let mut filters = probabilistic_write(
105                &self.inner.probabilistic.filters,
106                "probabilistic filter store",
107            );
108            for (name, data_hex) in entries {
109                let bytes = hex::decode(&data_hex).map_err(|err| {
110                    RedDBError::Internal(format!(
111                        "invalid persisted FILTER state for '{name}': {err}"
112                    ))
113                })?;
114                let filter =
115                    crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
116                        .ok_or_else(|| {
117                            RedDBError::Internal(format!(
118                                "invalid persisted FILTER state for '{name}'"
119                            ))
120                        })?;
121                filters.insert(name, filter);
122            }
123        }
124
125        Ok(())
126    }
127
128    fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
129        let Some(manager) = self.inner.db.store().get_collection("red_config") else {
130            return Vec::new();
131        };
132        let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
133            std::collections::HashMap::new();
134        for entity in manager.query_all(|_| true) {
135            let EntityData::Row(row) = &entity.data else {
136                continue;
137            };
138            let Some(named) = &row.named else {
139                continue;
140            };
141            let Some(Value::Text(key)) = named.get("key") else {
142                continue;
143            };
144            let Some(encoded_name) = key.strip_prefix(prefix) else {
145                continue;
146            };
147            let value = match named.get("value") {
148                Some(Value::Text(value)) => Some(value.to_string()),
149                Some(Value::Null) => None,
150                _ => continue,
151            };
152            let entity_id = entity.id.raw();
153            match latest.get(encoded_name) {
154                Some((existing_id, _)) if *existing_id > entity_id => {}
155                _ => {
156                    latest.insert(encoded_name.to_string(), (entity_id, value));
157                }
158            }
159        }
160
161        latest
162            .into_iter()
163            .filter_map(|(encoded_name, (_, value))| {
164                let value = value?;
165                let bytes = hex::decode(encoded_name).ok()?;
166                let name = String::from_utf8(bytes).ok()?;
167                Some((name, value))
168            })
169            .collect()
170    }
171
172    fn persist_probabilistic_blob(
173        &self,
174        prefix: &str,
175        name: &str,
176        bytes: &[u8],
177    ) -> RedDBResult<()> {
178        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
179        self.inner
180            .db
181            .store()
182            .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
183        Ok(())
184    }
185
186    fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
187        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
188        self.inner
189            .db
190            .store()
191            .set_config_tree(&key, &crate::serde_json::Value::Null);
192        Ok(())
193    }
194
195    fn create_probabilistic_catalog_entry(
196        &self,
197        name: &str,
198        model: crate::catalog::CollectionModel,
199    ) -> RedDBResult<()> {
200        let store = self.inner.db.store();
201        store
202            .create_collection(name)
203            .map_err(|err| RedDBError::Internal(err.to_string()))?;
204        self.inner
205            .db
206            .save_collection_contract(probabilistic_collection_contract(name, model))
207            .map_err(|err| RedDBError::Internal(err.to_string()))?;
208        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
209            store.set_config_tree(
210                &format!("red.collection_tenants.{name}"),
211                &crate::serde_json::Value::String(tenant_id),
212            );
213        }
214        self.inner
215            .db
216            .persist_metadata()
217            .map_err(|err| RedDBError::Internal(err.to_string()))?;
218        self.invalidate_result_cache();
219        Ok(())
220    }
221
222    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
223        let store = self.inner.db.store();
224        if store.get_collection(name).is_some() {
225            store
226                .drop_collection(name)
227                .map_err(|err| RedDBError::Internal(err.to_string()))?;
228        }
229        self.inner
230            .db
231            .remove_collection_contract(name)
232            .map_err(|err| RedDBError::Internal(err.to_string()))?;
233        self.inner
234            .db
235            .persist_metadata()
236            .map_err(|err| RedDBError::Internal(err.to_string()))?;
237        self.invalidate_result_cache();
238        Ok(())
239    }
240
241    pub(crate) fn execute_probabilistic_select(
242        &self,
243        query: &TableQuery,
244    ) -> RedDBResult<Option<UnifiedResult>> {
245        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
246        let mut read_projections = Vec::new();
247        for projection in &projections {
248            if let Some(read_projection) =
249                parse_probabilistic_read_projection(projection, read_projections.len())?
250            {
251                read_projections.push(read_projection);
252            }
253        }
254
255        let Some(actual_model) = self
256            .inner
257            .db
258            .collection_contract(&query.table)
259            .map(|contract| contract.declared_model)
260        else {
261            return if read_projections.is_empty() {
262                Ok(None)
263            } else {
264                Err(RedDBError::NotFound(format!(
265                    "probabilistic collection '{}' not found",
266                    query.table
267                )))
268            };
269        };
270
271        let is_probabilistic_model = matches!(
272            actual_model,
273            crate::catalog::CollectionModel::Hll
274                | crate::catalog::CollectionModel::Sketch
275                | crate::catalog::CollectionModel::Filter
276        );
277        if read_projections.is_empty() {
278            return if is_probabilistic_model {
279                Err(RedDBError::Query(format!(
280                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
281                    query.table
282                )))
283            } else {
284                Ok(None)
285            };
286        }
287
288        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
289        let (columns, record) =
290            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
291        let mut result = UnifiedResult::with_columns(columns);
292        if probabilistic_select_row_visible(self, query, &record) {
293            result.push(record);
294        }
295        Ok(Some(result))
296    }
297
298    pub fn execute_probabilistic_command(
299        &self,
300        raw_query: &str,
301        cmd: &ProbabilisticCommand,
302    ) -> RedDBResult<RuntimeQueryResult> {
303        // Mixed read/write surface: count/info/check are read-side and
304        // must remain available on read-only replicas; create/add/
305        // merge/delete/drop are mutations and must go through the gate.
306        let is_mutation = matches!(
307            cmd,
308            ProbabilisticCommand::CreateHll { .. }
309                | ProbabilisticCommand::HllAdd { .. }
310                | ProbabilisticCommand::HllMerge { .. }
311                | ProbabilisticCommand::DropHll { .. }
312                | ProbabilisticCommand::CreateSketch { .. }
313                | ProbabilisticCommand::SketchAdd { .. }
314                | ProbabilisticCommand::SketchMerge { .. }
315                | ProbabilisticCommand::DropSketch { .. }
316                | ProbabilisticCommand::CreateFilter { .. }
317                | ProbabilisticCommand::FilterAdd { .. }
318                | ProbabilisticCommand::FilterDelete { .. }
319                | ProbabilisticCommand::DropFilter { .. }
320        );
321        if is_mutation {
322            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
323        }
324        match cmd {
325            // ── HyperLogLog ──────────────────────────────────────────
326            ProbabilisticCommand::CreateHll {
327                name,
328                precision,
329                if_not_exists,
330            } => {
331                let mut hlls =
332                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
333                if hlls.contains_key(name) {
334                    if *if_not_exists {
335                        return Ok(RuntimeQueryResult::ok_message(
336                            raw_query.to_string(),
337                            &format!("HLL '{}' already exists", name),
338                            "create",
339                        ));
340                    }
341                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
342                }
343                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
344                    *precision,
345                )
346                .ok_or_else(|| {
347                    RedDBError::Query(format!(
348                        "HLL precision must be between 4 and 18, got {precision}"
349                    ))
350                })?;
351                self.create_probabilistic_catalog_entry(
352                    name,
353                    crate::catalog::CollectionModel::Hll,
354                )?;
355                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
356                hlls.insert(name.clone(), hll);
357                Ok(RuntimeQueryResult::ok_message(
358                    raw_query.to_string(),
359                    &format!("HLL '{}' created", name),
360                    "create",
361                ))
362            }
363            ProbabilisticCommand::HllAdd { name, elements } => {
364                let mut hlls =
365                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
366                let hll = hlls
367                    .get_mut(name)
368                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
369                for elem in elements {
370                    hll.add(elem.as_bytes());
371                }
372                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
373                Ok(RuntimeQueryResult::ok_message(
374                    raw_query.to_string(),
375                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
376                    "insert",
377                ))
378            }
379            ProbabilisticCommand::HllCount { names } => {
380                let hlls =
381                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
382                if names.len() == 1 {
383                    let hll = hlls.get(&names[0]).ok_or_else(|| {
384                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
385                    })?;
386                    let count = hll.count();
387                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
388                    let mut record = UnifiedRecord::new();
389                    record.set("count", Value::UnsignedInteger(count));
390                    result.push(record);
391                    Ok(RuntimeQueryResult {
392                        query: raw_query.to_string(),
393                        mode: QueryMode::Sql,
394                        statement: "hll_count",
395                        engine: "runtime-probabilistic",
396                        result,
397                        affected_rows: 0,
398                        statement_type: "select",
399                        bookmark: None,
400                    })
401                } else {
402                    // Multi-HLL count = union count
403                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
404                    for name in names {
405                        let hll = hlls.get(name).ok_or_else(|| {
406                            RedDBError::NotFound(format!("HLL '{}' not found", name))
407                        })?;
408                        merged.merge(hll);
409                    }
410                    let count = merged.count();
411                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
412                    let mut record = UnifiedRecord::new();
413                    record.set("count", Value::UnsignedInteger(count));
414                    result.push(record);
415                    Ok(RuntimeQueryResult {
416                        query: raw_query.to_string(),
417                        mode: QueryMode::Sql,
418                        statement: "hll_count",
419                        engine: "runtime-probabilistic",
420                        result,
421                        affected_rows: 0,
422                        statement_type: "select",
423                        bookmark: None,
424                    })
425                }
426            }
427            ProbabilisticCommand::HllMerge { dest, sources } => {
428                let mut hlls =
429                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
430                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
431                for src in sources {
432                    let hll = hlls
433                        .get(src)
434                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
435                    merged.merge(hll);
436                }
437                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
438                hlls.insert(dest.clone(), merged);
439                Ok(RuntimeQueryResult::ok_message(
440                    raw_query.to_string(),
441                    &format!(
442                        "HLL '{}' created from merge of {}",
443                        dest,
444                        sources.join(", ")
445                    ),
446                    "create",
447                ))
448            }
449            ProbabilisticCommand::HllInfo { name } => {
450                let hlls =
451                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
452                let hll = hlls
453                    .get(name)
454                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
455                let mut result = UnifiedResult::with_columns(vec![
456                    "name".into(),
457                    "precision".into(),
458                    "count".into(),
459                    "memory_bytes".into(),
460                ]);
461                let mut record = UnifiedRecord::new();
462                record.set("name", Value::text(name.clone()));
463                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
464                record.set("count", Value::UnsignedInteger(hll.count()));
465                record.set(
466                    "memory_bytes",
467                    Value::UnsignedInteger(hll.memory_bytes() as u64),
468                );
469                result.push(record);
470                Ok(RuntimeQueryResult {
471                    query: raw_query.to_string(),
472                    mode: QueryMode::Sql,
473                    statement: "hll_info",
474                    engine: "runtime-probabilistic",
475                    result,
476                    affected_rows: 0,
477                    statement_type: "select",
478                    bookmark: None,
479                })
480            }
481            ProbabilisticCommand::DropHll { name, if_exists } => {
482                let mut hlls =
483                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
484                if hlls.remove(name).is_none() {
485                    if *if_exists {
486                        return Ok(RuntimeQueryResult::ok_message(
487                            raw_query.to_string(),
488                            &format!("HLL '{}' does not exist", name),
489                            "drop",
490                        ));
491                    }
492                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
493                }
494                self.drop_probabilistic_catalog_entry(name)?;
495                self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
496                Ok(RuntimeQueryResult::ok_message(
497                    raw_query.to_string(),
498                    &format!("HLL '{}' dropped", name),
499                    "drop",
500                ))
501            }
502
503            // ── Count-Min Sketch ───────────────────────────────────────
504            ProbabilisticCommand::CreateSketch {
505                name,
506                width,
507                depth,
508                if_not_exists,
509            } => {
510                let mut sketches = probabilistic_write(
511                    &self.inner.probabilistic.sketches,
512                    "probabilistic sketch store",
513                );
514                if sketches.contains_key(name) {
515                    if *if_not_exists {
516                        return Ok(RuntimeQueryResult::ok_message(
517                            raw_query.to_string(),
518                            &format!("SKETCH '{}' already exists", name),
519                            "create",
520                        ));
521                    }
522                    return Err(RedDBError::Query(format!(
523                        "SKETCH '{}' already exists",
524                        name
525                    )));
526                }
527                self.create_probabilistic_catalog_entry(
528                    name,
529                    crate::catalog::CollectionModel::Sketch,
530                )?;
531                let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
532                    *width, *depth,
533                );
534                self.persist_probabilistic_blob(
535                    PROB_SKETCH_STATE_PREFIX,
536                    name,
537                    &sketch.as_bytes(),
538                )?;
539                sketches.insert(name.clone(), sketch);
540                Ok(RuntimeQueryResult::ok_message(
541                    raw_query.to_string(),
542                    &format!(
543                        "SKETCH '{}' created (width={}, depth={})",
544                        name, width, depth
545                    ),
546                    "create",
547                ))
548            }
549            ProbabilisticCommand::SketchAdd {
550                name,
551                element,
552                count,
553            } => {
554                let mut sketches = probabilistic_write(
555                    &self.inner.probabilistic.sketches,
556                    "probabilistic sketch store",
557                );
558                let sketch = sketches
559                    .get_mut(name)
560                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
561                sketch.add(element.as_bytes(), *count);
562                self.persist_probabilistic_blob(
563                    PROB_SKETCH_STATE_PREFIX,
564                    name,
565                    &sketch.as_bytes(),
566                )?;
567                Ok(RuntimeQueryResult::ok_message(
568                    raw_query.to_string(),
569                    &format!("added {} to SKETCH '{}'", count, name),
570                    "insert",
571                ))
572            }
573            ProbabilisticCommand::SketchCount { name, element } => {
574                let sketches = probabilistic_read(
575                    &self.inner.probabilistic.sketches,
576                    "probabilistic sketch store",
577                );
578                let sketch = sketches
579                    .get(name)
580                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
581                let estimate = sketch.estimate(element.as_bytes());
582                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
583                let mut record = UnifiedRecord::new();
584                record.set("estimate", Value::UnsignedInteger(estimate));
585                result.push(record);
586                Ok(RuntimeQueryResult {
587                    query: raw_query.to_string(),
588                    mode: QueryMode::Sql,
589                    statement: "sketch_count",
590                    engine: "runtime-probabilistic",
591                    result,
592                    affected_rows: 0,
593                    statement_type: "select",
594                    bookmark: None,
595                })
596            }
597            ProbabilisticCommand::SketchMerge { dest, sources } => {
598                let mut sketches = probabilistic_write(
599                    &self.inner.probabilistic.sketches,
600                    "probabilistic sketch store",
601                );
602                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
603                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
604                })?;
605                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
606                    first_src.width(),
607                    first_src.depth(),
608                );
609                for src in sources {
610                    let sketch = sketches.get(src).ok_or_else(|| {
611                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
612                    })?;
613                    if !merged.merge(sketch) {
614                        return Err(RedDBError::Query(format!(
615                            "SKETCH '{}' has incompatible dimensions",
616                            src
617                        )));
618                    }
619                }
620                self.persist_probabilistic_blob(
621                    PROB_SKETCH_STATE_PREFIX,
622                    dest,
623                    &merged.as_bytes(),
624                )?;
625                sketches.insert(dest.clone(), merged);
626                Ok(RuntimeQueryResult::ok_message(
627                    raw_query.to_string(),
628                    &format!(
629                        "SKETCH '{}' created from merge of {}",
630                        dest,
631                        sources.join(", ")
632                    ),
633                    "create",
634                ))
635            }
636            ProbabilisticCommand::SketchInfo { name } => {
637                let sketches = probabilistic_read(
638                    &self.inner.probabilistic.sketches,
639                    "probabilistic sketch store",
640                );
641                let sketch = sketches
642                    .get(name)
643                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
644                let mut result = UnifiedResult::with_columns(vec![
645                    "name".into(),
646                    "width".into(),
647                    "depth".into(),
648                    "total".into(),
649                    "memory_bytes".into(),
650                ]);
651                let mut record = UnifiedRecord::new();
652                record.set("name", Value::text(name.clone()));
653                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
654                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
655                record.set("total", Value::UnsignedInteger(sketch.total()));
656                record.set(
657                    "memory_bytes",
658                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
659                );
660                result.push(record);
661                Ok(RuntimeQueryResult {
662                    query: raw_query.to_string(),
663                    mode: QueryMode::Sql,
664                    statement: "sketch_info",
665                    engine: "runtime-probabilistic",
666                    result,
667                    affected_rows: 0,
668                    statement_type: "select",
669                    bookmark: None,
670                })
671            }
672            ProbabilisticCommand::DropSketch { name, if_exists } => {
673                let mut sketches = probabilistic_write(
674                    &self.inner.probabilistic.sketches,
675                    "probabilistic sketch store",
676                );
677                if sketches.remove(name).is_none() {
678                    if *if_exists {
679                        return Ok(RuntimeQueryResult::ok_message(
680                            raw_query.to_string(),
681                            &format!("SKETCH '{}' does not exist", name),
682                            "drop",
683                        ));
684                    }
685                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
686                }
687                self.drop_probabilistic_catalog_entry(name)?;
688                self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
689                Ok(RuntimeQueryResult::ok_message(
690                    raw_query.to_string(),
691                    &format!("SKETCH '{}' dropped", name),
692                    "drop",
693                ))
694            }
695
696            // ── Cuckoo Filter ─────────────────────────────────────────
697            ProbabilisticCommand::CreateFilter {
698                name,
699                capacity,
700                if_not_exists,
701            } => {
702                let mut filters = probabilistic_write(
703                    &self.inner.probabilistic.filters,
704                    "probabilistic filter store",
705                );
706                if filters.contains_key(name) {
707                    if *if_not_exists {
708                        return Ok(RuntimeQueryResult::ok_message(
709                            raw_query.to_string(),
710                            &format!("FILTER '{}' already exists", name),
711                            "create",
712                        ));
713                    }
714                    return Err(RedDBError::Query(format!(
715                        "FILTER '{}' already exists",
716                        name
717                    )));
718                }
719                self.create_probabilistic_catalog_entry(
720                    name,
721                    crate::catalog::CollectionModel::Filter,
722                )?;
723                let filter =
724                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
725                self.persist_probabilistic_blob(
726                    PROB_FILTER_STATE_PREFIX,
727                    name,
728                    &filter.as_bytes(),
729                )?;
730                filters.insert(name.clone(), filter);
731                Ok(RuntimeQueryResult::ok_message(
732                    raw_query.to_string(),
733                    &format!("FILTER '{}' created (capacity={})", name, capacity),
734                    "create",
735                ))
736            }
737            ProbabilisticCommand::FilterAdd { name, element } => {
738                let mut filters = probabilistic_write(
739                    &self.inner.probabilistic.filters,
740                    "probabilistic filter store",
741                );
742                let filter = filters
743                    .get_mut(name)
744                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
745                if !filter.insert(element.as_bytes()) {
746                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
747                }
748                self.persist_probabilistic_blob(
749                    PROB_FILTER_STATE_PREFIX,
750                    name,
751                    &filter.as_bytes(),
752                )?;
753                Ok(RuntimeQueryResult::ok_message(
754                    raw_query.to_string(),
755                    &format!("element added to FILTER '{}'", name),
756                    "insert",
757                ))
758            }
759            ProbabilisticCommand::FilterCheck { name, element } => {
760                let filters = probabilistic_read(
761                    &self.inner.probabilistic.filters,
762                    "probabilistic filter store",
763                );
764                let filter = filters
765                    .get(name)
766                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
767                let exists = filter.contains(element.as_bytes());
768                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
769                let mut record = UnifiedRecord::new();
770                record.set("exists", Value::Boolean(exists));
771                result.push(record);
772                Ok(RuntimeQueryResult {
773                    query: raw_query.to_string(),
774                    mode: QueryMode::Sql,
775                    statement: "filter_check",
776                    engine: "runtime-probabilistic",
777                    result,
778                    affected_rows: 0,
779                    statement_type: "select",
780                    bookmark: None,
781                })
782            }
783            ProbabilisticCommand::FilterDelete { name, element } => {
784                let mut filters = probabilistic_write(
785                    &self.inner.probabilistic.filters,
786                    "probabilistic filter store",
787                );
788                let filter = filters
789                    .get_mut(name)
790                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
791                let removed = filter.delete(element.as_bytes());
792                self.persist_probabilistic_blob(
793                    PROB_FILTER_STATE_PREFIX,
794                    name,
795                    &filter.as_bytes(),
796                )?;
797                Ok(RuntimeQueryResult::ok_message(
798                    raw_query.to_string(),
799                    &format!(
800                        "element {} from FILTER '{}'",
801                        if removed { "deleted" } else { "not found in" },
802                        name
803                    ),
804                    "delete",
805                ))
806            }
807            ProbabilisticCommand::FilterCount { name } => {
808                let filters = probabilistic_read(
809                    &self.inner.probabilistic.filters,
810                    "probabilistic filter store",
811                );
812                let filter = filters
813                    .get(name)
814                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
815                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
816                let mut record = UnifiedRecord::new();
817                record.set("count", Value::UnsignedInteger(filter.count() as u64));
818                result.push(record);
819                Ok(RuntimeQueryResult {
820                    query: raw_query.to_string(),
821                    mode: QueryMode::Sql,
822                    statement: "filter_count",
823                    engine: "runtime-probabilistic",
824                    result,
825                    affected_rows: 0,
826                    statement_type: "select",
827                    bookmark: None,
828                })
829            }
830            ProbabilisticCommand::FilterInfo { name } => {
831                let filters = probabilistic_read(
832                    &self.inner.probabilistic.filters,
833                    "probabilistic filter store",
834                );
835                let filter = filters
836                    .get(name)
837                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
838                let mut result = UnifiedResult::with_columns(vec![
839                    "name".into(),
840                    "count".into(),
841                    "load_factor".into(),
842                    "memory_bytes".into(),
843                ]);
844                let mut record = UnifiedRecord::new();
845                record.set("name", Value::text(name.clone()));
846                record.set("count", Value::UnsignedInteger(filter.count() as u64));
847                record.set("load_factor", Value::Float(filter.load_factor()));
848                record.set(
849                    "memory_bytes",
850                    Value::UnsignedInteger(filter.memory_bytes() as u64),
851                );
852                result.push(record);
853                Ok(RuntimeQueryResult {
854                    query: raw_query.to_string(),
855                    mode: QueryMode::Sql,
856                    statement: "filter_info",
857                    engine: "runtime-probabilistic",
858                    result,
859                    affected_rows: 0,
860                    statement_type: "select",
861                    bookmark: None,
862                })
863            }
864            ProbabilisticCommand::DropFilter { name, if_exists } => {
865                let mut filters = probabilistic_write(
866                    &self.inner.probabilistic.filters,
867                    "probabilistic filter store",
868                );
869                if filters.remove(name).is_none() {
870                    if *if_exists {
871                        return Ok(RuntimeQueryResult::ok_message(
872                            raw_query.to_string(),
873                            &format!("FILTER '{}' does not exist", name),
874                            "drop",
875                        ));
876                    }
877                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
878                }
879                self.drop_probabilistic_catalog_entry(name)?;
880                self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
881                Ok(RuntimeQueryResult::ok_message(
882                    raw_query.to_string(),
883                    &format!("FILTER '{}' dropped", name),
884                    "drop",
885                ))
886            }
887        }
888    }
889}
890
891fn parse_probabilistic_read_projection(
892    projection: &Projection,
893    index: usize,
894) -> RedDBResult<Option<ProbabilisticReadProjection>> {
895    if let Some(column) = projection_unqualified_column(projection) {
896        if column.eq_ignore_ascii_case("CARDINALITY") {
897            return Ok(Some(ProbabilisticReadProjection::Cardinality {
898                label: probabilistic_projection_label(projection, "cardinality", index),
899            }));
900        }
901    }
902
903    let Some((function, args)) = projection_function(projection) else {
904        return Ok(None);
905    };
906    if function.eq_ignore_ascii_case("FREQ") {
907        let element = projection_single_text_arg(function, args)?;
908        return Ok(Some(ProbabilisticReadProjection::Freq {
909            element,
910            label: probabilistic_projection_label(projection, "freq", index),
911        }));
912    }
913    if function.eq_ignore_ascii_case("CONTAINS") {
914        let element = projection_single_text_arg(function, args)?;
915        return Ok(Some(ProbabilisticReadProjection::Contains {
916            element,
917            label: probabilistic_projection_label(projection, "contains", index),
918        }));
919    }
920
921    Ok(None)
922}
923
924fn validate_probabilistic_read_model(
925    collection: &str,
926    actual_model: crate::catalog::CollectionModel,
927    projections: &[ProbabilisticReadProjection],
928) -> RedDBResult<()> {
929    for projection in projections {
930        let expected_model = match projection {
931            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
932            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
933            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
934        };
935        if actual_model != expected_model {
936            return Err(RedDBError::Query(format!(
937                "{} is only supported for {} collections; '{}' is {}",
938                probabilistic_projection_form(projection),
939                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
940                collection,
941                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
942            )));
943        }
944    }
945    Ok(())
946}
947
948impl RedDBRuntime {
949    fn materialize_probabilistic_select_row(
950        &self,
951        collection: &str,
952        projections: &[ProbabilisticReadProjection],
953    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
954        let mut columns = Vec::with_capacity(projections.len());
955        let mut record = UnifiedRecord::new();
956        for projection in projections {
957            match projection {
958                ProbabilisticReadProjection::Cardinality { label } => {
959                    let hlls = probabilistic_read(
960                        &self.inner.probabilistic.hlls,
961                        "probabilistic HLL store",
962                    );
963                    let hll = hlls.get(collection).ok_or_else(|| {
964                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
965                    })?;
966                    columns.push(label.clone());
967                    record.set(label, Value::UnsignedInteger(hll.count()));
968                }
969                ProbabilisticReadProjection::Freq { element, label } => {
970                    let sketches = probabilistic_read(
971                        &self.inner.probabilistic.sketches,
972                        "probabilistic sketch store",
973                    );
974                    let sketch = sketches.get(collection).ok_or_else(|| {
975                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
976                    })?;
977                    columns.push(label.clone());
978                    record.set(
979                        label,
980                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
981                    );
982                }
983                ProbabilisticReadProjection::Contains { element, label } => {
984                    let filters = probabilistic_read(
985                        &self.inner.probabilistic.filters,
986                        "probabilistic filter store",
987                    );
988                    let filter = filters.get(collection).ok_or_else(|| {
989                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
990                    })?;
991                    columns.push(label.clone());
992                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
993                }
994            }
995        }
996        Ok((columns, record))
997    }
998}
999
1000fn probabilistic_select_row_visible(
1001    runtime: &RedDBRuntime,
1002    query: &TableQuery,
1003    record: &UnifiedRecord,
1004) -> bool {
1005    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
1006        return false;
1007    }
1008    let table_name = query.table.as_str();
1009    let table_alias = query.alias.as_deref().unwrap_or(table_name);
1010    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1011        super::join_filter::evaluate_runtime_filter_with_db(
1012            Some(&runtime.inner.db),
1013            record,
1014            &filter,
1015            Some(table_name),
1016            Some(table_alias),
1017        )
1018    })
1019}
1020
1021fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1022    match projection {
1023        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1024            Some(column.as_str())
1025        }
1026        Projection::Column(column) => Some(column.as_str()),
1027        Projection::Alias(column, _) => Some(column.as_str()),
1028        _ => None,
1029    }
1030}
1031
1032fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1033    match projection {
1034        Projection::Function(name, args) => {
1035            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1036            Some((function, args.as_slice()))
1037        }
1038        _ => None,
1039    }
1040}
1041
1042fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1043    if args.len() != 1 {
1044        return Err(RedDBError::Query(format!(
1045            "{function}(...) expects exactly one string literal"
1046        )));
1047    }
1048    match &args[0] {
1049        Projection::Column(column) => column
1050            .strip_prefix("LIT:")
1051            .map(ToString::to_string)
1052            .ok_or_else(|| {
1053                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1054            }),
1055        _ => Err(RedDBError::Query(format!(
1056            "{function}(...) expects a string literal argument"
1057        ))),
1058    }
1059}
1060
1061fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1062    match projection {
1063        Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1064            if alias.eq_ignore_ascii_case(column) =>
1065        {
1066            numbered_probabilistic_label(base, index)
1067        }
1068        Projection::Field(_, Some(alias)) => alias.clone(),
1069        Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1070            numbered_probabilistic_label(base, index)
1071        }
1072        Projection::Alias(_, alias) => alias.clone(),
1073        Projection::Function(name, _) => name
1074            .split_once(':')
1075            .map(|(_, alias)| {
1076                if is_generated_probabilistic_function_label(alias, base) {
1077                    numbered_probabilistic_label(base, index)
1078                } else {
1079                    alias.to_string()
1080                }
1081            })
1082            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1083        _ => numbered_probabilistic_label(base, index),
1084    }
1085}
1086
1087fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1088    alias
1089        .get(..base.len())
1090        .is_some_and(|head| head.eq_ignore_ascii_case(base))
1091        && alias[base.len()..].starts_with('(')
1092}
1093
1094fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1095    if index == 0 {
1096        base.to_string()
1097    } else {
1098        format!("{base}_{}", index + 1)
1099    }
1100}
1101
1102fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1103    match projection {
1104        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1105        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1106        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1107    }
1108}