Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11    lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15    lock.write()
16}
17
18fn probabilistic_collection_contract(
19    name: &str,
20    model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22    let now = crate::utils::now_unix_millis() as u128;
23    crate::physical::CollectionContract {
24        name: name.to_string(),
25        declared_model: model,
26        schema_mode: crate::catalog::SchemaMode::Dynamic,
27        origin: crate::physical::ContractOrigin::Explicit,
28        version: 1,
29        created_at_unix_ms: now,
30        updated_at_unix_ms: now,
31        default_ttl_ms: None,
32        vector_dimension: None,
33        vector_metric: None,
34        context_index_fields: Vec::new(),
35        declared_columns: Vec::new(),
36        table_def: None,
37        timestamps_enabled: false,
38        context_index_enabled: false,
39        metrics_raw_retention_ms: None,
40        metrics_rollup_policies: Vec::new(),
41        metrics_tenant_identity: None,
42        metrics_namespace: None,
43        append_only: false,
44        subscriptions: Vec::new(),
45        session_key: None,
46        session_gap_ms: None,
47        retention_duration_ms: None,
48    }
49}
50
51enum ProbabilisticReadProjection {
52    Cardinality { label: String },
53    Freq { element: String, label: String },
54    Contains { element: String, label: String },
55}
56
57impl RedDBRuntime {
58    pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
59        {
60            let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
61            let mut hlls =
62                probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
63            for (name, data_hex) in entries {
64                let bytes = hex::decode(&data_hex).map_err(|err| {
65                    RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
66                })?;
67                let Some(hll) =
68                    crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
69                else {
70                    return Err(RedDBError::Internal(format!(
71                        "invalid persisted HLL state for '{name}'"
72                    )));
73                };
74                hlls.insert(name, hll);
75            }
76        }
77
78        {
79            let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
80            let mut sketches = probabilistic_write(
81                &self.inner.probabilistic.sketches,
82                "probabilistic sketch store",
83            );
84            for (name, data_hex) in entries {
85                let bytes = hex::decode(&data_hex).map_err(|err| {
86                    RedDBError::Internal(format!(
87                        "invalid persisted SKETCH state for '{name}': {err}"
88                    ))
89                })?;
90                let sketch =
91                    crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
92                        &bytes,
93                    )
94                    .ok_or_else(|| {
95                        RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
96                    })?;
97                sketches.insert(name, sketch);
98            }
99        }
100
101        {
102            let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
103            let mut filters = probabilistic_write(
104                &self.inner.probabilistic.filters,
105                "probabilistic filter store",
106            );
107            for (name, data_hex) in entries {
108                let bytes = hex::decode(&data_hex).map_err(|err| {
109                    RedDBError::Internal(format!(
110                        "invalid persisted FILTER state for '{name}': {err}"
111                    ))
112                })?;
113                let filter =
114                    crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
115                        .ok_or_else(|| {
116                            RedDBError::Internal(format!(
117                                "invalid persisted FILTER state for '{name}'"
118                            ))
119                        })?;
120                filters.insert(name, filter);
121            }
122        }
123
124        Ok(())
125    }
126
127    fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
128        let Some(manager) = self.inner.db.store().get_collection("red_config") else {
129            return Vec::new();
130        };
131        let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
132            std::collections::HashMap::new();
133        for entity in manager.query_all(|_| true) {
134            let EntityData::Row(row) = &entity.data else {
135                continue;
136            };
137            let Some(named) = &row.named else {
138                continue;
139            };
140            let Some(Value::Text(key)) = named.get("key") else {
141                continue;
142            };
143            let Some(encoded_name) = key.strip_prefix(prefix) else {
144                continue;
145            };
146            let value = match named.get("value") {
147                Some(Value::Text(value)) => Some(value.to_string()),
148                Some(Value::Null) => None,
149                _ => continue,
150            };
151            let entity_id = entity.id.raw();
152            match latest.get(encoded_name) {
153                Some((existing_id, _)) if *existing_id > entity_id => {}
154                _ => {
155                    latest.insert(encoded_name.to_string(), (entity_id, value));
156                }
157            }
158        }
159
160        latest
161            .into_iter()
162            .filter_map(|(encoded_name, (_, value))| {
163                let value = value?;
164                let bytes = hex::decode(encoded_name).ok()?;
165                let name = String::from_utf8(bytes).ok()?;
166                Some((name, value))
167            })
168            .collect()
169    }
170
171    fn persist_probabilistic_blob(
172        &self,
173        prefix: &str,
174        name: &str,
175        bytes: &[u8],
176    ) -> RedDBResult<()> {
177        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
178        self.inner
179            .db
180            .store()
181            .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
182        Ok(())
183    }
184
185    fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
186        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
187        self.inner
188            .db
189            .store()
190            .set_config_tree(&key, &crate::serde_json::Value::Null);
191        Ok(())
192    }
193
194    fn create_probabilistic_catalog_entry(
195        &self,
196        name: &str,
197        model: crate::catalog::CollectionModel,
198    ) -> RedDBResult<()> {
199        let store = self.inner.db.store();
200        store
201            .create_collection(name)
202            .map_err(|err| RedDBError::Internal(err.to_string()))?;
203        self.inner
204            .db
205            .save_collection_contract(probabilistic_collection_contract(name, model))
206            .map_err(|err| RedDBError::Internal(err.to_string()))?;
207        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
208            store.set_config_tree(
209                &format!("red.collection_tenants.{name}"),
210                &crate::serde_json::Value::String(tenant_id),
211            );
212        }
213        self.inner
214            .db
215            .persist_metadata()
216            .map_err(|err| RedDBError::Internal(err.to_string()))?;
217        self.invalidate_result_cache();
218        Ok(())
219    }
220
221    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
222        let store = self.inner.db.store();
223        if store.get_collection(name).is_some() {
224            store
225                .drop_collection(name)
226                .map_err(|err| RedDBError::Internal(err.to_string()))?;
227        }
228        self.inner
229            .db
230            .remove_collection_contract(name)
231            .map_err(|err| RedDBError::Internal(err.to_string()))?;
232        self.inner
233            .db
234            .persist_metadata()
235            .map_err(|err| RedDBError::Internal(err.to_string()))?;
236        self.invalidate_result_cache();
237        Ok(())
238    }
239
240    pub(crate) fn execute_probabilistic_select(
241        &self,
242        query: &TableQuery,
243    ) -> RedDBResult<Option<UnifiedResult>> {
244        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
245        let mut read_projections = Vec::new();
246        for projection in &projections {
247            if let Some(read_projection) =
248                parse_probabilistic_read_projection(projection, read_projections.len())?
249            {
250                read_projections.push(read_projection);
251            }
252        }
253
254        let Some(actual_model) = self
255            .inner
256            .db
257            .collection_contract(&query.table)
258            .map(|contract| contract.declared_model)
259        else {
260            return if read_projections.is_empty() {
261                Ok(None)
262            } else {
263                Err(RedDBError::NotFound(format!(
264                    "probabilistic collection '{}' not found",
265                    query.table
266                )))
267            };
268        };
269
270        let is_probabilistic_model = matches!(
271            actual_model,
272            crate::catalog::CollectionModel::Hll
273                | crate::catalog::CollectionModel::Sketch
274                | crate::catalog::CollectionModel::Filter
275        );
276        if read_projections.is_empty() {
277            return if is_probabilistic_model {
278                Err(RedDBError::Query(format!(
279                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
280                    query.table
281                )))
282            } else {
283                Ok(None)
284            };
285        }
286
287        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
288        let (columns, record) =
289            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
290        let mut result = UnifiedResult::with_columns(columns);
291        if probabilistic_select_row_visible(self, query, &record) {
292            result.push(record);
293        }
294        Ok(Some(result))
295    }
296
297    pub fn execute_probabilistic_command(
298        &self,
299        raw_query: &str,
300        cmd: &ProbabilisticCommand,
301    ) -> RedDBResult<RuntimeQueryResult> {
302        // Mixed read/write surface: count/info/check are read-side and
303        // must remain available on read-only replicas; create/add/
304        // merge/delete/drop are mutations and must go through the gate.
305        let is_mutation = matches!(
306            cmd,
307            ProbabilisticCommand::CreateHll { .. }
308                | ProbabilisticCommand::HllAdd { .. }
309                | ProbabilisticCommand::HllMerge { .. }
310                | ProbabilisticCommand::DropHll { .. }
311                | ProbabilisticCommand::CreateSketch { .. }
312                | ProbabilisticCommand::SketchAdd { .. }
313                | ProbabilisticCommand::SketchMerge { .. }
314                | ProbabilisticCommand::DropSketch { .. }
315                | ProbabilisticCommand::CreateFilter { .. }
316                | ProbabilisticCommand::FilterAdd { .. }
317                | ProbabilisticCommand::FilterDelete { .. }
318                | ProbabilisticCommand::DropFilter { .. }
319        );
320        if is_mutation {
321            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
322        }
323        match cmd {
324            // ── HyperLogLog ──────────────────────────────────────────
325            ProbabilisticCommand::CreateHll {
326                name,
327                precision,
328                if_not_exists,
329            } => {
330                let mut hlls =
331                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
332                if hlls.contains_key(name) {
333                    if *if_not_exists {
334                        return Ok(RuntimeQueryResult::ok_message(
335                            raw_query.to_string(),
336                            &format!("HLL '{}' already exists", name),
337                            "create",
338                        ));
339                    }
340                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
341                }
342                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
343                    *precision,
344                )
345                .ok_or_else(|| {
346                    RedDBError::Query(format!(
347                        "HLL precision must be between 4 and 18, got {precision}"
348                    ))
349                })?;
350                self.create_probabilistic_catalog_entry(
351                    name,
352                    crate::catalog::CollectionModel::Hll,
353                )?;
354                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
355                hlls.insert(name.clone(), hll);
356                Ok(RuntimeQueryResult::ok_message(
357                    raw_query.to_string(),
358                    &format!("HLL '{}' created", name),
359                    "create",
360                ))
361            }
362            ProbabilisticCommand::HllAdd { name, elements } => {
363                let mut hlls =
364                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
365                let hll = hlls
366                    .get_mut(name)
367                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
368                for elem in elements {
369                    hll.add(elem.as_bytes());
370                }
371                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
372                Ok(RuntimeQueryResult::ok_message(
373                    raw_query.to_string(),
374                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
375                    "insert",
376                ))
377            }
378            ProbabilisticCommand::HllCount { names } => {
379                let hlls =
380                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
381                if names.len() == 1 {
382                    let hll = hlls.get(&names[0]).ok_or_else(|| {
383                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
384                    })?;
385                    let count = hll.count();
386                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
387                    let mut record = UnifiedRecord::new();
388                    record.set("count", Value::UnsignedInteger(count));
389                    result.push(record);
390                    Ok(RuntimeQueryResult {
391                        query: raw_query.to_string(),
392                        mode: QueryMode::Sql,
393                        statement: "hll_count",
394                        engine: "runtime-probabilistic",
395                        result,
396                        affected_rows: 0,
397                        statement_type: "select",
398                    })
399                } else {
400                    // Multi-HLL count = union count
401                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
402                    for name in names {
403                        let hll = hlls.get(name).ok_or_else(|| {
404                            RedDBError::NotFound(format!("HLL '{}' not found", name))
405                        })?;
406                        merged.merge(hll);
407                    }
408                    let count = merged.count();
409                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
410                    let mut record = UnifiedRecord::new();
411                    record.set("count", Value::UnsignedInteger(count));
412                    result.push(record);
413                    Ok(RuntimeQueryResult {
414                        query: raw_query.to_string(),
415                        mode: QueryMode::Sql,
416                        statement: "hll_count",
417                        engine: "runtime-probabilistic",
418                        result,
419                        affected_rows: 0,
420                        statement_type: "select",
421                    })
422                }
423            }
424            ProbabilisticCommand::HllMerge { dest, sources } => {
425                let mut hlls =
426                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
427                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
428                for src in sources {
429                    let hll = hlls
430                        .get(src)
431                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
432                    merged.merge(hll);
433                }
434                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
435                hlls.insert(dest.clone(), merged);
436                Ok(RuntimeQueryResult::ok_message(
437                    raw_query.to_string(),
438                    &format!(
439                        "HLL '{}' created from merge of {}",
440                        dest,
441                        sources.join(", ")
442                    ),
443                    "create",
444                ))
445            }
446            ProbabilisticCommand::HllInfo { name } => {
447                let hlls =
448                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
449                let hll = hlls
450                    .get(name)
451                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
452                let mut result = UnifiedResult::with_columns(vec![
453                    "name".into(),
454                    "precision".into(),
455                    "count".into(),
456                    "memory_bytes".into(),
457                ]);
458                let mut record = UnifiedRecord::new();
459                record.set("name", Value::text(name.clone()));
460                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
461                record.set("count", Value::UnsignedInteger(hll.count()));
462                record.set(
463                    "memory_bytes",
464                    Value::UnsignedInteger(hll.memory_bytes() as u64),
465                );
466                result.push(record);
467                Ok(RuntimeQueryResult {
468                    query: raw_query.to_string(),
469                    mode: QueryMode::Sql,
470                    statement: "hll_info",
471                    engine: "runtime-probabilistic",
472                    result,
473                    affected_rows: 0,
474                    statement_type: "select",
475                })
476            }
477            ProbabilisticCommand::DropHll { name, if_exists } => {
478                let mut hlls =
479                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
480                if hlls.remove(name).is_none() {
481                    if *if_exists {
482                        return Ok(RuntimeQueryResult::ok_message(
483                            raw_query.to_string(),
484                            &format!("HLL '{}' does not exist", name),
485                            "drop",
486                        ));
487                    }
488                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
489                }
490                self.drop_probabilistic_catalog_entry(name)?;
491                self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
492                Ok(RuntimeQueryResult::ok_message(
493                    raw_query.to_string(),
494                    &format!("HLL '{}' dropped", name),
495                    "drop",
496                ))
497            }
498
499            // ── Count-Min Sketch ───────────────────────────────────────
500            ProbabilisticCommand::CreateSketch {
501                name,
502                width,
503                depth,
504                if_not_exists,
505            } => {
506                let mut sketches = probabilistic_write(
507                    &self.inner.probabilistic.sketches,
508                    "probabilistic sketch store",
509                );
510                if sketches.contains_key(name) {
511                    if *if_not_exists {
512                        return Ok(RuntimeQueryResult::ok_message(
513                            raw_query.to_string(),
514                            &format!("SKETCH '{}' already exists", name),
515                            "create",
516                        ));
517                    }
518                    return Err(RedDBError::Query(format!(
519                        "SKETCH '{}' already exists",
520                        name
521                    )));
522                }
523                self.create_probabilistic_catalog_entry(
524                    name,
525                    crate::catalog::CollectionModel::Sketch,
526                )?;
527                let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
528                    *width, *depth,
529                );
530                self.persist_probabilistic_blob(
531                    PROB_SKETCH_STATE_PREFIX,
532                    name,
533                    &sketch.as_bytes(),
534                )?;
535                sketches.insert(name.clone(), sketch);
536                Ok(RuntimeQueryResult::ok_message(
537                    raw_query.to_string(),
538                    &format!(
539                        "SKETCH '{}' created (width={}, depth={})",
540                        name, width, depth
541                    ),
542                    "create",
543                ))
544            }
545            ProbabilisticCommand::SketchAdd {
546                name,
547                element,
548                count,
549            } => {
550                let mut sketches = probabilistic_write(
551                    &self.inner.probabilistic.sketches,
552                    "probabilistic sketch store",
553                );
554                let sketch = sketches
555                    .get_mut(name)
556                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
557                sketch.add(element.as_bytes(), *count);
558                self.persist_probabilistic_blob(
559                    PROB_SKETCH_STATE_PREFIX,
560                    name,
561                    &sketch.as_bytes(),
562                )?;
563                Ok(RuntimeQueryResult::ok_message(
564                    raw_query.to_string(),
565                    &format!("added {} to SKETCH '{}'", count, name),
566                    "insert",
567                ))
568            }
569            ProbabilisticCommand::SketchCount { name, element } => {
570                let sketches = probabilistic_read(
571                    &self.inner.probabilistic.sketches,
572                    "probabilistic sketch store",
573                );
574                let sketch = sketches
575                    .get(name)
576                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
577                let estimate = sketch.estimate(element.as_bytes());
578                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
579                let mut record = UnifiedRecord::new();
580                record.set("estimate", Value::UnsignedInteger(estimate));
581                result.push(record);
582                Ok(RuntimeQueryResult {
583                    query: raw_query.to_string(),
584                    mode: QueryMode::Sql,
585                    statement: "sketch_count",
586                    engine: "runtime-probabilistic",
587                    result,
588                    affected_rows: 0,
589                    statement_type: "select",
590                })
591            }
592            ProbabilisticCommand::SketchMerge { dest, sources } => {
593                let mut sketches = probabilistic_write(
594                    &self.inner.probabilistic.sketches,
595                    "probabilistic sketch store",
596                );
597                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
598                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
599                })?;
600                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
601                    first_src.width(),
602                    first_src.depth(),
603                );
604                for src in sources {
605                    let sketch = sketches.get(src).ok_or_else(|| {
606                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
607                    })?;
608                    if !merged.merge(sketch) {
609                        return Err(RedDBError::Query(format!(
610                            "SKETCH '{}' has incompatible dimensions",
611                            src
612                        )));
613                    }
614                }
615                self.persist_probabilistic_blob(
616                    PROB_SKETCH_STATE_PREFIX,
617                    dest,
618                    &merged.as_bytes(),
619                )?;
620                sketches.insert(dest.clone(), merged);
621                Ok(RuntimeQueryResult::ok_message(
622                    raw_query.to_string(),
623                    &format!(
624                        "SKETCH '{}' created from merge of {}",
625                        dest,
626                        sources.join(", ")
627                    ),
628                    "create",
629                ))
630            }
631            ProbabilisticCommand::SketchInfo { name } => {
632                let sketches = probabilistic_read(
633                    &self.inner.probabilistic.sketches,
634                    "probabilistic sketch store",
635                );
636                let sketch = sketches
637                    .get(name)
638                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
639                let mut result = UnifiedResult::with_columns(vec![
640                    "name".into(),
641                    "width".into(),
642                    "depth".into(),
643                    "total".into(),
644                    "memory_bytes".into(),
645                ]);
646                let mut record = UnifiedRecord::new();
647                record.set("name", Value::text(name.clone()));
648                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
649                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
650                record.set("total", Value::UnsignedInteger(sketch.total()));
651                record.set(
652                    "memory_bytes",
653                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
654                );
655                result.push(record);
656                Ok(RuntimeQueryResult {
657                    query: raw_query.to_string(),
658                    mode: QueryMode::Sql,
659                    statement: "sketch_info",
660                    engine: "runtime-probabilistic",
661                    result,
662                    affected_rows: 0,
663                    statement_type: "select",
664                })
665            }
666            ProbabilisticCommand::DropSketch { name, if_exists } => {
667                let mut sketches = probabilistic_write(
668                    &self.inner.probabilistic.sketches,
669                    "probabilistic sketch store",
670                );
671                if sketches.remove(name).is_none() {
672                    if *if_exists {
673                        return Ok(RuntimeQueryResult::ok_message(
674                            raw_query.to_string(),
675                            &format!("SKETCH '{}' does not exist", name),
676                            "drop",
677                        ));
678                    }
679                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
680                }
681                self.drop_probabilistic_catalog_entry(name)?;
682                self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
683                Ok(RuntimeQueryResult::ok_message(
684                    raw_query.to_string(),
685                    &format!("SKETCH '{}' dropped", name),
686                    "drop",
687                ))
688            }
689
690            // ── Cuckoo Filter ─────────────────────────────────────────
691            ProbabilisticCommand::CreateFilter {
692                name,
693                capacity,
694                if_not_exists,
695            } => {
696                let mut filters = probabilistic_write(
697                    &self.inner.probabilistic.filters,
698                    "probabilistic filter store",
699                );
700                if filters.contains_key(name) {
701                    if *if_not_exists {
702                        return Ok(RuntimeQueryResult::ok_message(
703                            raw_query.to_string(),
704                            &format!("FILTER '{}' already exists", name),
705                            "create",
706                        ));
707                    }
708                    return Err(RedDBError::Query(format!(
709                        "FILTER '{}' already exists",
710                        name
711                    )));
712                }
713                self.create_probabilistic_catalog_entry(
714                    name,
715                    crate::catalog::CollectionModel::Filter,
716                )?;
717                let filter =
718                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
719                self.persist_probabilistic_blob(
720                    PROB_FILTER_STATE_PREFIX,
721                    name,
722                    &filter.as_bytes(),
723                )?;
724                filters.insert(name.clone(), filter);
725                Ok(RuntimeQueryResult::ok_message(
726                    raw_query.to_string(),
727                    &format!("FILTER '{}' created (capacity={})", name, capacity),
728                    "create",
729                ))
730            }
731            ProbabilisticCommand::FilterAdd { name, element } => {
732                let mut filters = probabilistic_write(
733                    &self.inner.probabilistic.filters,
734                    "probabilistic filter store",
735                );
736                let filter = filters
737                    .get_mut(name)
738                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
739                if !filter.insert(element.as_bytes()) {
740                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
741                }
742                self.persist_probabilistic_blob(
743                    PROB_FILTER_STATE_PREFIX,
744                    name,
745                    &filter.as_bytes(),
746                )?;
747                Ok(RuntimeQueryResult::ok_message(
748                    raw_query.to_string(),
749                    &format!("element added to FILTER '{}'", name),
750                    "insert",
751                ))
752            }
753            ProbabilisticCommand::FilterCheck { name, element } => {
754                let filters = probabilistic_read(
755                    &self.inner.probabilistic.filters,
756                    "probabilistic filter store",
757                );
758                let filter = filters
759                    .get(name)
760                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
761                let exists = filter.contains(element.as_bytes());
762                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
763                let mut record = UnifiedRecord::new();
764                record.set("exists", Value::Boolean(exists));
765                result.push(record);
766                Ok(RuntimeQueryResult {
767                    query: raw_query.to_string(),
768                    mode: QueryMode::Sql,
769                    statement: "filter_check",
770                    engine: "runtime-probabilistic",
771                    result,
772                    affected_rows: 0,
773                    statement_type: "select",
774                })
775            }
776            ProbabilisticCommand::FilterDelete { name, element } => {
777                let mut filters = probabilistic_write(
778                    &self.inner.probabilistic.filters,
779                    "probabilistic filter store",
780                );
781                let filter = filters
782                    .get_mut(name)
783                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
784                let removed = filter.delete(element.as_bytes());
785                self.persist_probabilistic_blob(
786                    PROB_FILTER_STATE_PREFIX,
787                    name,
788                    &filter.as_bytes(),
789                )?;
790                Ok(RuntimeQueryResult::ok_message(
791                    raw_query.to_string(),
792                    &format!(
793                        "element {} from FILTER '{}'",
794                        if removed { "deleted" } else { "not found in" },
795                        name
796                    ),
797                    "delete",
798                ))
799            }
800            ProbabilisticCommand::FilterCount { name } => {
801                let filters = probabilistic_read(
802                    &self.inner.probabilistic.filters,
803                    "probabilistic filter store",
804                );
805                let filter = filters
806                    .get(name)
807                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
808                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
809                let mut record = UnifiedRecord::new();
810                record.set("count", Value::UnsignedInteger(filter.count() as u64));
811                result.push(record);
812                Ok(RuntimeQueryResult {
813                    query: raw_query.to_string(),
814                    mode: QueryMode::Sql,
815                    statement: "filter_count",
816                    engine: "runtime-probabilistic",
817                    result,
818                    affected_rows: 0,
819                    statement_type: "select",
820                })
821            }
822            ProbabilisticCommand::FilterInfo { name } => {
823                let filters = probabilistic_read(
824                    &self.inner.probabilistic.filters,
825                    "probabilistic filter store",
826                );
827                let filter = filters
828                    .get(name)
829                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
830                let mut result = UnifiedResult::with_columns(vec![
831                    "name".into(),
832                    "count".into(),
833                    "load_factor".into(),
834                    "memory_bytes".into(),
835                ]);
836                let mut record = UnifiedRecord::new();
837                record.set("name", Value::text(name.clone()));
838                record.set("count", Value::UnsignedInteger(filter.count() as u64));
839                record.set("load_factor", Value::Float(filter.load_factor()));
840                record.set(
841                    "memory_bytes",
842                    Value::UnsignedInteger(filter.memory_bytes() as u64),
843                );
844                result.push(record);
845                Ok(RuntimeQueryResult {
846                    query: raw_query.to_string(),
847                    mode: QueryMode::Sql,
848                    statement: "filter_info",
849                    engine: "runtime-probabilistic",
850                    result,
851                    affected_rows: 0,
852                    statement_type: "select",
853                })
854            }
855            ProbabilisticCommand::DropFilter { name, if_exists } => {
856                let mut filters = probabilistic_write(
857                    &self.inner.probabilistic.filters,
858                    "probabilistic filter store",
859                );
860                if filters.remove(name).is_none() {
861                    if *if_exists {
862                        return Ok(RuntimeQueryResult::ok_message(
863                            raw_query.to_string(),
864                            &format!("FILTER '{}' does not exist", name),
865                            "drop",
866                        ));
867                    }
868                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
869                }
870                self.drop_probabilistic_catalog_entry(name)?;
871                self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
872                Ok(RuntimeQueryResult::ok_message(
873                    raw_query.to_string(),
874                    &format!("FILTER '{}' dropped", name),
875                    "drop",
876                ))
877            }
878        }
879    }
880}
881
882fn parse_probabilistic_read_projection(
883    projection: &Projection,
884    index: usize,
885) -> RedDBResult<Option<ProbabilisticReadProjection>> {
886    if let Some(column) = projection_unqualified_column(projection) {
887        if column.eq_ignore_ascii_case("CARDINALITY") {
888            return Ok(Some(ProbabilisticReadProjection::Cardinality {
889                label: probabilistic_projection_label(projection, "cardinality", index),
890            }));
891        }
892    }
893
894    let Some((function, args)) = projection_function(projection) else {
895        return Ok(None);
896    };
897    if function.eq_ignore_ascii_case("FREQ") {
898        let element = projection_single_text_arg(function, args)?;
899        return Ok(Some(ProbabilisticReadProjection::Freq {
900            element,
901            label: probabilistic_projection_label(projection, "freq", index),
902        }));
903    }
904    if function.eq_ignore_ascii_case("CONTAINS") {
905        let element = projection_single_text_arg(function, args)?;
906        return Ok(Some(ProbabilisticReadProjection::Contains {
907            element,
908            label: probabilistic_projection_label(projection, "contains", index),
909        }));
910    }
911
912    Ok(None)
913}
914
915fn validate_probabilistic_read_model(
916    collection: &str,
917    actual_model: crate::catalog::CollectionModel,
918    projections: &[ProbabilisticReadProjection],
919) -> RedDBResult<()> {
920    for projection in projections {
921        let expected_model = match projection {
922            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
923            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
924            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
925        };
926        if actual_model != expected_model {
927            return Err(RedDBError::Query(format!(
928                "{} is only supported for {} collections; '{}' is {}",
929                probabilistic_projection_form(projection),
930                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
931                collection,
932                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
933            )));
934        }
935    }
936    Ok(())
937}
938
939impl RedDBRuntime {
940    fn materialize_probabilistic_select_row(
941        &self,
942        collection: &str,
943        projections: &[ProbabilisticReadProjection],
944    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
945        let mut columns = Vec::with_capacity(projections.len());
946        let mut record = UnifiedRecord::new();
947        for projection in projections {
948            match projection {
949                ProbabilisticReadProjection::Cardinality { label } => {
950                    let hlls = probabilistic_read(
951                        &self.inner.probabilistic.hlls,
952                        "probabilistic HLL store",
953                    );
954                    let hll = hlls.get(collection).ok_or_else(|| {
955                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
956                    })?;
957                    columns.push(label.clone());
958                    record.set(label, Value::UnsignedInteger(hll.count()));
959                }
960                ProbabilisticReadProjection::Freq { element, label } => {
961                    let sketches = probabilistic_read(
962                        &self.inner.probabilistic.sketches,
963                        "probabilistic sketch store",
964                    );
965                    let sketch = sketches.get(collection).ok_or_else(|| {
966                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
967                    })?;
968                    columns.push(label.clone());
969                    record.set(
970                        label,
971                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
972                    );
973                }
974                ProbabilisticReadProjection::Contains { element, label } => {
975                    let filters = probabilistic_read(
976                        &self.inner.probabilistic.filters,
977                        "probabilistic filter store",
978                    );
979                    let filter = filters.get(collection).ok_or_else(|| {
980                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
981                    })?;
982                    columns.push(label.clone());
983                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
984                }
985            }
986        }
987        Ok((columns, record))
988    }
989}
990
991fn probabilistic_select_row_visible(
992    runtime: &RedDBRuntime,
993    query: &TableQuery,
994    record: &UnifiedRecord,
995) -> bool {
996    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
997        return false;
998    }
999    let table_name = query.table.as_str();
1000    let table_alias = query.alias.as_deref().unwrap_or(table_name);
1001    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
1002        super::join_filter::evaluate_runtime_filter_with_db(
1003            Some(&runtime.inner.db),
1004            record,
1005            &filter,
1006            Some(table_name),
1007            Some(table_alias),
1008        )
1009    })
1010}
1011
1012fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1013    match projection {
1014        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1015            Some(column.as_str())
1016        }
1017        Projection::Column(column) => Some(column.as_str()),
1018        Projection::Alias(column, _) => Some(column.as_str()),
1019        _ => None,
1020    }
1021}
1022
1023fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1024    match projection {
1025        Projection::Function(name, args) => {
1026            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1027            Some((function, args.as_slice()))
1028        }
1029        _ => None,
1030    }
1031}
1032
1033fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1034    if args.len() != 1 {
1035        return Err(RedDBError::Query(format!(
1036            "{function}(...) expects exactly one string literal"
1037        )));
1038    }
1039    match &args[0] {
1040        Projection::Column(column) => column
1041            .strip_prefix("LIT:")
1042            .map(ToString::to_string)
1043            .ok_or_else(|| {
1044                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1045            }),
1046        _ => Err(RedDBError::Query(format!(
1047            "{function}(...) expects a string literal argument"
1048        ))),
1049    }
1050}
1051
1052fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1053    match projection {
1054        Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1055            if alias.eq_ignore_ascii_case(column) =>
1056        {
1057            numbered_probabilistic_label(base, index)
1058        }
1059        Projection::Field(_, Some(alias)) => alias.clone(),
1060        Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1061            numbered_probabilistic_label(base, index)
1062        }
1063        Projection::Alias(_, alias) => alias.clone(),
1064        Projection::Function(name, _) => name
1065            .split_once(':')
1066            .map(|(_, alias)| {
1067                if is_generated_probabilistic_function_label(alias, base) {
1068                    numbered_probabilistic_label(base, index)
1069                } else {
1070                    alias.to_string()
1071                }
1072            })
1073            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1074        _ => numbered_probabilistic_label(base, index),
1075    }
1076}
1077
1078fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1079    alias
1080        .get(..base.len())
1081        .is_some_and(|head| head.eq_ignore_ascii_case(base))
1082        && alias[base.len()..].starts_with('(')
1083}
1084
1085fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1086    if index == 0 {
1087        base.to_string()
1088    } else {
1089        format!("{base}_{}", index + 1)
1090    }
1091}
1092
1093fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1094    match projection {
1095        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1096        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1097        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1098    }
1099}