Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6const PROB_HLL_STATE_PREFIX: &str = "red.probabilistic.hll.";
7const PROB_SKETCH_STATE_PREFIX: &str = "red.probabilistic.sketch.";
8const PROB_FILTER_STATE_PREFIX: &str = "red.probabilistic.filter.";
9
10fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
11    lock.read()
12}
13
14fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
15    lock.write()
16}
17
18fn probabilistic_collection_contract(
19    name: &str,
20    model: crate::catalog::CollectionModel,
21) -> crate::physical::CollectionContract {
22    let now = crate::utils::now_unix_millis() as u128;
23    crate::physical::CollectionContract {
24        name: name.to_string(),
25        declared_model: model,
26        schema_mode: crate::catalog::SchemaMode::Dynamic,
27        origin: crate::physical::ContractOrigin::Explicit,
28        version: 1,
29        created_at_unix_ms: now,
30        updated_at_unix_ms: now,
31        default_ttl_ms: None,
32        vector_dimension: None,
33        vector_metric: None,
34        context_index_fields: Vec::new(),
35        declared_columns: Vec::new(),
36        table_def: None,
37        timestamps_enabled: false,
38        context_index_enabled: false,
39        metrics_raw_retention_ms: None,
40        metrics_rollup_policies: Vec::new(),
41        metrics_tenant_identity: None,
42        metrics_namespace: None,
43        append_only: false,
44        subscriptions: Vec::new(),
45    }
46}
47
48enum ProbabilisticReadProjection {
49    Cardinality { label: String },
50    Freq { element: String, label: String },
51    Contains { element: String, label: String },
52}
53
54impl RedDBRuntime {
55    pub(crate) fn load_probabilistic_state(&self) -> RedDBResult<()> {
56        {
57            let entries = self.latest_probabilistic_state_entries(PROB_HLL_STATE_PREFIX);
58            let mut hlls =
59                probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
60            for (name, data_hex) in entries {
61                let bytes = hex::decode(&data_hex).map_err(|err| {
62                    RedDBError::Internal(format!("invalid persisted HLL state for '{name}': {err}"))
63                })?;
64                let Some(hll) =
65                    crate::storage::primitives::hyperloglog::HyperLogLog::from_bytes(bytes)
66                else {
67                    return Err(RedDBError::Internal(format!(
68                        "invalid persisted HLL state for '{name}'"
69                    )));
70                };
71                hlls.insert(name, hll);
72            }
73        }
74
75        {
76            let entries = self.latest_probabilistic_state_entries(PROB_SKETCH_STATE_PREFIX);
77            let mut sketches = probabilistic_write(
78                &self.inner.probabilistic.sketches,
79                "probabilistic sketch store",
80            );
81            for (name, data_hex) in entries {
82                let bytes = hex::decode(&data_hex).map_err(|err| {
83                    RedDBError::Internal(format!(
84                        "invalid persisted SKETCH state for '{name}': {err}"
85                    ))
86                })?;
87                let sketch =
88                    crate::storage::primitives::count_min_sketch::CountMinSketch::from_bytes(
89                        &bytes,
90                    )
91                    .ok_or_else(|| {
92                        RedDBError::Internal(format!("invalid persisted SKETCH state for '{name}'"))
93                    })?;
94                sketches.insert(name, sketch);
95            }
96        }
97
98        {
99            let entries = self.latest_probabilistic_state_entries(PROB_FILTER_STATE_PREFIX);
100            let mut filters = probabilistic_write(
101                &self.inner.probabilistic.filters,
102                "probabilistic filter store",
103            );
104            for (name, data_hex) in entries {
105                let bytes = hex::decode(&data_hex).map_err(|err| {
106                    RedDBError::Internal(format!(
107                        "invalid persisted FILTER state for '{name}': {err}"
108                    ))
109                })?;
110                let filter =
111                    crate::storage::primitives::cuckoo_filter::CuckooFilter::from_bytes(&bytes)
112                        .ok_or_else(|| {
113                            RedDBError::Internal(format!(
114                                "invalid persisted FILTER state for '{name}'"
115                            ))
116                        })?;
117                filters.insert(name, filter);
118            }
119        }
120
121        Ok(())
122    }
123
124    fn latest_probabilistic_state_entries(&self, prefix: &str) -> Vec<(String, String)> {
125        let Some(manager) = self.inner.db.store().get_collection("red_config") else {
126            return Vec::new();
127        };
128        let mut latest: std::collections::HashMap<String, (u64, Option<String>)> =
129            std::collections::HashMap::new();
130        for entity in manager.query_all(|_| true) {
131            let EntityData::Row(row) = &entity.data else {
132                continue;
133            };
134            let Some(named) = &row.named else {
135                continue;
136            };
137            let Some(Value::Text(key)) = named.get("key") else {
138                continue;
139            };
140            let Some(encoded_name) = key.strip_prefix(prefix) else {
141                continue;
142            };
143            let value = match named.get("value") {
144                Some(Value::Text(value)) => Some(value.to_string()),
145                Some(Value::Null) => None,
146                _ => continue,
147            };
148            let entity_id = entity.id.raw();
149            match latest.get(encoded_name) {
150                Some((existing_id, _)) if *existing_id > entity_id => {}
151                _ => {
152                    latest.insert(encoded_name.to_string(), (entity_id, value));
153                }
154            }
155        }
156
157        latest
158            .into_iter()
159            .filter_map(|(encoded_name, (_, value))| {
160                let value = value?;
161                let bytes = hex::decode(encoded_name).ok()?;
162                let name = String::from_utf8(bytes).ok()?;
163                Some((name, value))
164            })
165            .collect()
166    }
167
168    fn persist_probabilistic_blob(
169        &self,
170        prefix: &str,
171        name: &str,
172        bytes: &[u8],
173    ) -> RedDBResult<()> {
174        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
175        self.inner
176            .db
177            .store()
178            .set_config_tree(&key, &crate::serde_json::Value::String(hex::encode(bytes)));
179        Ok(())
180    }
181
182    fn delete_probabilistic_blob(&self, prefix: &str, name: &str) -> RedDBResult<()> {
183        let key = format!("{prefix}{}", hex::encode(name.as_bytes()));
184        self.inner
185            .db
186            .store()
187            .set_config_tree(&key, &crate::serde_json::Value::Null);
188        Ok(())
189    }
190
191    fn create_probabilistic_catalog_entry(
192        &self,
193        name: &str,
194        model: crate::catalog::CollectionModel,
195    ) -> RedDBResult<()> {
196        let store = self.inner.db.store();
197        store
198            .create_collection(name)
199            .map_err(|err| RedDBError::Internal(err.to_string()))?;
200        self.inner
201            .db
202            .save_collection_contract(probabilistic_collection_contract(name, model))
203            .map_err(|err| RedDBError::Internal(err.to_string()))?;
204        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
205            store.set_config_tree(
206                &format!("red.collection_tenants.{name}"),
207                &crate::serde_json::Value::String(tenant_id),
208            );
209        }
210        self.inner
211            .db
212            .persist_metadata()
213            .map_err(|err| RedDBError::Internal(err.to_string()))?;
214        self.invalidate_result_cache();
215        Ok(())
216    }
217
218    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
219        let store = self.inner.db.store();
220        if store.get_collection(name).is_some() {
221            store
222                .drop_collection(name)
223                .map_err(|err| RedDBError::Internal(err.to_string()))?;
224        }
225        self.inner
226            .db
227            .remove_collection_contract(name)
228            .map_err(|err| RedDBError::Internal(err.to_string()))?;
229        self.inner
230            .db
231            .persist_metadata()
232            .map_err(|err| RedDBError::Internal(err.to_string()))?;
233        self.invalidate_result_cache();
234        Ok(())
235    }
236
237    pub(crate) fn execute_probabilistic_select(
238        &self,
239        query: &TableQuery,
240    ) -> RedDBResult<Option<UnifiedResult>> {
241        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
242        let mut read_projections = Vec::new();
243        for projection in &projections {
244            if let Some(read_projection) =
245                parse_probabilistic_read_projection(projection, read_projections.len())?
246            {
247                read_projections.push(read_projection);
248            }
249        }
250
251        let Some(actual_model) = self
252            .inner
253            .db
254            .collection_contract(&query.table)
255            .map(|contract| contract.declared_model)
256        else {
257            return if read_projections.is_empty() {
258                Ok(None)
259            } else {
260                Err(RedDBError::NotFound(format!(
261                    "probabilistic collection '{}' not found",
262                    query.table
263                )))
264            };
265        };
266
267        let is_probabilistic_model = matches!(
268            actual_model,
269            crate::catalog::CollectionModel::Hll
270                | crate::catalog::CollectionModel::Sketch
271                | crate::catalog::CollectionModel::Filter
272        );
273        if read_projections.is_empty() {
274            return if is_probabilistic_model {
275                Err(RedDBError::Query(format!(
276                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
277                    query.table
278                )))
279            } else {
280                Ok(None)
281            };
282        }
283
284        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
285        let (columns, record) =
286            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
287        let mut result = UnifiedResult::with_columns(columns);
288        if probabilistic_select_row_visible(self, query, &record) {
289            result.push(record);
290        }
291        Ok(Some(result))
292    }
293
294    pub fn execute_probabilistic_command(
295        &self,
296        raw_query: &str,
297        cmd: &ProbabilisticCommand,
298    ) -> RedDBResult<RuntimeQueryResult> {
299        // Mixed read/write surface: count/info/check are read-side and
300        // must remain available on read-only replicas; create/add/
301        // merge/delete/drop are mutations and must go through the gate.
302        let is_mutation = matches!(
303            cmd,
304            ProbabilisticCommand::CreateHll { .. }
305                | ProbabilisticCommand::HllAdd { .. }
306                | ProbabilisticCommand::HllMerge { .. }
307                | ProbabilisticCommand::DropHll { .. }
308                | ProbabilisticCommand::CreateSketch { .. }
309                | ProbabilisticCommand::SketchAdd { .. }
310                | ProbabilisticCommand::SketchMerge { .. }
311                | ProbabilisticCommand::DropSketch { .. }
312                | ProbabilisticCommand::CreateFilter { .. }
313                | ProbabilisticCommand::FilterAdd { .. }
314                | ProbabilisticCommand::FilterDelete { .. }
315                | ProbabilisticCommand::DropFilter { .. }
316        );
317        if is_mutation {
318            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
319        }
320        match cmd {
321            // ── HyperLogLog ──────────────────────────────────────────
322            ProbabilisticCommand::CreateHll {
323                name,
324                precision,
325                if_not_exists,
326            } => {
327                let mut hlls =
328                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
329                if hlls.contains_key(name) {
330                    if *if_not_exists {
331                        return Ok(RuntimeQueryResult::ok_message(
332                            raw_query.to_string(),
333                            &format!("HLL '{}' already exists", name),
334                            "create",
335                        ));
336                    }
337                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
338                }
339                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
340                    *precision,
341                )
342                .ok_or_else(|| {
343                    RedDBError::Query(format!(
344                        "HLL precision must be between 4 and 18, got {precision}"
345                    ))
346                })?;
347                self.create_probabilistic_catalog_entry(
348                    name,
349                    crate::catalog::CollectionModel::Hll,
350                )?;
351                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
352                hlls.insert(name.clone(), hll);
353                Ok(RuntimeQueryResult::ok_message(
354                    raw_query.to_string(),
355                    &format!("HLL '{}' created", name),
356                    "create",
357                ))
358            }
359            ProbabilisticCommand::HllAdd { name, elements } => {
360                let mut hlls =
361                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
362                let hll = hlls
363                    .get_mut(name)
364                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
365                for elem in elements {
366                    hll.add(elem.as_bytes());
367                }
368                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, name, hll.as_bytes())?;
369                Ok(RuntimeQueryResult::ok_message(
370                    raw_query.to_string(),
371                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
372                    "insert",
373                ))
374            }
375            ProbabilisticCommand::HllCount { names } => {
376                let hlls =
377                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
378                if names.len() == 1 {
379                    let hll = hlls.get(&names[0]).ok_or_else(|| {
380                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
381                    })?;
382                    let count = hll.count();
383                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
384                    let mut record = UnifiedRecord::new();
385                    record.set("count", Value::UnsignedInteger(count));
386                    result.push(record);
387                    Ok(RuntimeQueryResult {
388                        query: raw_query.to_string(),
389                        mode: QueryMode::Sql,
390                        statement: "hll_count",
391                        engine: "runtime-probabilistic",
392                        result,
393                        affected_rows: 0,
394                        statement_type: "select",
395                    })
396                } else {
397                    // Multi-HLL count = union count
398                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
399                    for name in names {
400                        let hll = hlls.get(name).ok_or_else(|| {
401                            RedDBError::NotFound(format!("HLL '{}' not found", name))
402                        })?;
403                        merged.merge(hll);
404                    }
405                    let count = merged.count();
406                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
407                    let mut record = UnifiedRecord::new();
408                    record.set("count", Value::UnsignedInteger(count));
409                    result.push(record);
410                    Ok(RuntimeQueryResult {
411                        query: raw_query.to_string(),
412                        mode: QueryMode::Sql,
413                        statement: "hll_count",
414                        engine: "runtime-probabilistic",
415                        result,
416                        affected_rows: 0,
417                        statement_type: "select",
418                    })
419                }
420            }
421            ProbabilisticCommand::HllMerge { dest, sources } => {
422                let mut hlls =
423                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
424                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
425                for src in sources {
426                    let hll = hlls
427                        .get(src)
428                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
429                    merged.merge(hll);
430                }
431                self.persist_probabilistic_blob(PROB_HLL_STATE_PREFIX, dest, merged.as_bytes())?;
432                hlls.insert(dest.clone(), merged);
433                Ok(RuntimeQueryResult::ok_message(
434                    raw_query.to_string(),
435                    &format!(
436                        "HLL '{}' created from merge of {}",
437                        dest,
438                        sources.join(", ")
439                    ),
440                    "create",
441                ))
442            }
443            ProbabilisticCommand::HllInfo { name } => {
444                let hlls =
445                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
446                let hll = hlls
447                    .get(name)
448                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
449                let mut result = UnifiedResult::with_columns(vec![
450                    "name".into(),
451                    "precision".into(),
452                    "count".into(),
453                    "memory_bytes".into(),
454                ]);
455                let mut record = UnifiedRecord::new();
456                record.set("name", Value::text(name.clone()));
457                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
458                record.set("count", Value::UnsignedInteger(hll.count()));
459                record.set(
460                    "memory_bytes",
461                    Value::UnsignedInteger(hll.memory_bytes() as u64),
462                );
463                result.push(record);
464                Ok(RuntimeQueryResult {
465                    query: raw_query.to_string(),
466                    mode: QueryMode::Sql,
467                    statement: "hll_info",
468                    engine: "runtime-probabilistic",
469                    result,
470                    affected_rows: 0,
471                    statement_type: "select",
472                })
473            }
474            ProbabilisticCommand::DropHll { name, if_exists } => {
475                let mut hlls =
476                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
477                if hlls.remove(name).is_none() {
478                    if *if_exists {
479                        return Ok(RuntimeQueryResult::ok_message(
480                            raw_query.to_string(),
481                            &format!("HLL '{}' does not exist", name),
482                            "drop",
483                        ));
484                    }
485                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
486                }
487                self.drop_probabilistic_catalog_entry(name)?;
488                self.delete_probabilistic_blob(PROB_HLL_STATE_PREFIX, name)?;
489                Ok(RuntimeQueryResult::ok_message(
490                    raw_query.to_string(),
491                    &format!("HLL '{}' dropped", name),
492                    "drop",
493                ))
494            }
495
496            // ── Count-Min Sketch ───────────────────────────────────────
497            ProbabilisticCommand::CreateSketch {
498                name,
499                width,
500                depth,
501                if_not_exists,
502            } => {
503                let mut sketches = probabilistic_write(
504                    &self.inner.probabilistic.sketches,
505                    "probabilistic sketch store",
506                );
507                if sketches.contains_key(name) {
508                    if *if_not_exists {
509                        return Ok(RuntimeQueryResult::ok_message(
510                            raw_query.to_string(),
511                            &format!("SKETCH '{}' already exists", name),
512                            "create",
513                        ));
514                    }
515                    return Err(RedDBError::Query(format!(
516                        "SKETCH '{}' already exists",
517                        name
518                    )));
519                }
520                self.create_probabilistic_catalog_entry(
521                    name,
522                    crate::catalog::CollectionModel::Sketch,
523                )?;
524                let sketch = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
525                    *width, *depth,
526                );
527                self.persist_probabilistic_blob(
528                    PROB_SKETCH_STATE_PREFIX,
529                    name,
530                    &sketch.as_bytes(),
531                )?;
532                sketches.insert(name.clone(), sketch);
533                Ok(RuntimeQueryResult::ok_message(
534                    raw_query.to_string(),
535                    &format!(
536                        "SKETCH '{}' created (width={}, depth={})",
537                        name, width, depth
538                    ),
539                    "create",
540                ))
541            }
542            ProbabilisticCommand::SketchAdd {
543                name,
544                element,
545                count,
546            } => {
547                let mut sketches = probabilistic_write(
548                    &self.inner.probabilistic.sketches,
549                    "probabilistic sketch store",
550                );
551                let sketch = sketches
552                    .get_mut(name)
553                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
554                sketch.add(element.as_bytes(), *count);
555                self.persist_probabilistic_blob(
556                    PROB_SKETCH_STATE_PREFIX,
557                    name,
558                    &sketch.as_bytes(),
559                )?;
560                Ok(RuntimeQueryResult::ok_message(
561                    raw_query.to_string(),
562                    &format!("added {} to SKETCH '{}'", count, name),
563                    "insert",
564                ))
565            }
566            ProbabilisticCommand::SketchCount { name, element } => {
567                let sketches = probabilistic_read(
568                    &self.inner.probabilistic.sketches,
569                    "probabilistic sketch store",
570                );
571                let sketch = sketches
572                    .get(name)
573                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
574                let estimate = sketch.estimate(element.as_bytes());
575                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
576                let mut record = UnifiedRecord::new();
577                record.set("estimate", Value::UnsignedInteger(estimate));
578                result.push(record);
579                Ok(RuntimeQueryResult {
580                    query: raw_query.to_string(),
581                    mode: QueryMode::Sql,
582                    statement: "sketch_count",
583                    engine: "runtime-probabilistic",
584                    result,
585                    affected_rows: 0,
586                    statement_type: "select",
587                })
588            }
589            ProbabilisticCommand::SketchMerge { dest, sources } => {
590                let mut sketches = probabilistic_write(
591                    &self.inner.probabilistic.sketches,
592                    "probabilistic sketch store",
593                );
594                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
595                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
596                })?;
597                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
598                    first_src.width(),
599                    first_src.depth(),
600                );
601                for src in sources {
602                    let sketch = sketches.get(src).ok_or_else(|| {
603                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
604                    })?;
605                    if !merged.merge(sketch) {
606                        return Err(RedDBError::Query(format!(
607                            "SKETCH '{}' has incompatible dimensions",
608                            src
609                        )));
610                    }
611                }
612                self.persist_probabilistic_blob(
613                    PROB_SKETCH_STATE_PREFIX,
614                    dest,
615                    &merged.as_bytes(),
616                )?;
617                sketches.insert(dest.clone(), merged);
618                Ok(RuntimeQueryResult::ok_message(
619                    raw_query.to_string(),
620                    &format!(
621                        "SKETCH '{}' created from merge of {}",
622                        dest,
623                        sources.join(", ")
624                    ),
625                    "create",
626                ))
627            }
628            ProbabilisticCommand::SketchInfo { name } => {
629                let sketches = probabilistic_read(
630                    &self.inner.probabilistic.sketches,
631                    "probabilistic sketch store",
632                );
633                let sketch = sketches
634                    .get(name)
635                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
636                let mut result = UnifiedResult::with_columns(vec![
637                    "name".into(),
638                    "width".into(),
639                    "depth".into(),
640                    "total".into(),
641                    "memory_bytes".into(),
642                ]);
643                let mut record = UnifiedRecord::new();
644                record.set("name", Value::text(name.clone()));
645                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
646                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
647                record.set("total", Value::UnsignedInteger(sketch.total()));
648                record.set(
649                    "memory_bytes",
650                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
651                );
652                result.push(record);
653                Ok(RuntimeQueryResult {
654                    query: raw_query.to_string(),
655                    mode: QueryMode::Sql,
656                    statement: "sketch_info",
657                    engine: "runtime-probabilistic",
658                    result,
659                    affected_rows: 0,
660                    statement_type: "select",
661                })
662            }
663            ProbabilisticCommand::DropSketch { name, if_exists } => {
664                let mut sketches = probabilistic_write(
665                    &self.inner.probabilistic.sketches,
666                    "probabilistic sketch store",
667                );
668                if sketches.remove(name).is_none() {
669                    if *if_exists {
670                        return Ok(RuntimeQueryResult::ok_message(
671                            raw_query.to_string(),
672                            &format!("SKETCH '{}' does not exist", name),
673                            "drop",
674                        ));
675                    }
676                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
677                }
678                self.drop_probabilistic_catalog_entry(name)?;
679                self.delete_probabilistic_blob(PROB_SKETCH_STATE_PREFIX, name)?;
680                Ok(RuntimeQueryResult::ok_message(
681                    raw_query.to_string(),
682                    &format!("SKETCH '{}' dropped", name),
683                    "drop",
684                ))
685            }
686
687            // ── Cuckoo Filter ─────────────────────────────────────────
688            ProbabilisticCommand::CreateFilter {
689                name,
690                capacity,
691                if_not_exists,
692            } => {
693                let mut filters = probabilistic_write(
694                    &self.inner.probabilistic.filters,
695                    "probabilistic filter store",
696                );
697                if filters.contains_key(name) {
698                    if *if_not_exists {
699                        return Ok(RuntimeQueryResult::ok_message(
700                            raw_query.to_string(),
701                            &format!("FILTER '{}' already exists", name),
702                            "create",
703                        ));
704                    }
705                    return Err(RedDBError::Query(format!(
706                        "FILTER '{}' already exists",
707                        name
708                    )));
709                }
710                self.create_probabilistic_catalog_entry(
711                    name,
712                    crate::catalog::CollectionModel::Filter,
713                )?;
714                let filter =
715                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity);
716                self.persist_probabilistic_blob(
717                    PROB_FILTER_STATE_PREFIX,
718                    name,
719                    &filter.as_bytes(),
720                )?;
721                filters.insert(name.clone(), filter);
722                Ok(RuntimeQueryResult::ok_message(
723                    raw_query.to_string(),
724                    &format!("FILTER '{}' created (capacity={})", name, capacity),
725                    "create",
726                ))
727            }
728            ProbabilisticCommand::FilterAdd { name, element } => {
729                let mut filters = probabilistic_write(
730                    &self.inner.probabilistic.filters,
731                    "probabilistic filter store",
732                );
733                let filter = filters
734                    .get_mut(name)
735                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
736                if !filter.insert(element.as_bytes()) {
737                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
738                }
739                self.persist_probabilistic_blob(
740                    PROB_FILTER_STATE_PREFIX,
741                    name,
742                    &filter.as_bytes(),
743                )?;
744                Ok(RuntimeQueryResult::ok_message(
745                    raw_query.to_string(),
746                    &format!("element added to FILTER '{}'", name),
747                    "insert",
748                ))
749            }
750            ProbabilisticCommand::FilterCheck { name, element } => {
751                let filters = probabilistic_read(
752                    &self.inner.probabilistic.filters,
753                    "probabilistic filter store",
754                );
755                let filter = filters
756                    .get(name)
757                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
758                let exists = filter.contains(element.as_bytes());
759                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
760                let mut record = UnifiedRecord::new();
761                record.set("exists", Value::Boolean(exists));
762                result.push(record);
763                Ok(RuntimeQueryResult {
764                    query: raw_query.to_string(),
765                    mode: QueryMode::Sql,
766                    statement: "filter_check",
767                    engine: "runtime-probabilistic",
768                    result,
769                    affected_rows: 0,
770                    statement_type: "select",
771                })
772            }
773            ProbabilisticCommand::FilterDelete { name, element } => {
774                let mut filters = probabilistic_write(
775                    &self.inner.probabilistic.filters,
776                    "probabilistic filter store",
777                );
778                let filter = filters
779                    .get_mut(name)
780                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
781                let removed = filter.delete(element.as_bytes());
782                self.persist_probabilistic_blob(
783                    PROB_FILTER_STATE_PREFIX,
784                    name,
785                    &filter.as_bytes(),
786                )?;
787                Ok(RuntimeQueryResult::ok_message(
788                    raw_query.to_string(),
789                    &format!(
790                        "element {} from FILTER '{}'",
791                        if removed { "deleted" } else { "not found in" },
792                        name
793                    ),
794                    "delete",
795                ))
796            }
797            ProbabilisticCommand::FilterCount { name } => {
798                let filters = probabilistic_read(
799                    &self.inner.probabilistic.filters,
800                    "probabilistic filter store",
801                );
802                let filter = filters
803                    .get(name)
804                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
805                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
806                let mut record = UnifiedRecord::new();
807                record.set("count", Value::UnsignedInteger(filter.count() as u64));
808                result.push(record);
809                Ok(RuntimeQueryResult {
810                    query: raw_query.to_string(),
811                    mode: QueryMode::Sql,
812                    statement: "filter_count",
813                    engine: "runtime-probabilistic",
814                    result,
815                    affected_rows: 0,
816                    statement_type: "select",
817                })
818            }
819            ProbabilisticCommand::FilterInfo { name } => {
820                let filters = probabilistic_read(
821                    &self.inner.probabilistic.filters,
822                    "probabilistic filter store",
823                );
824                let filter = filters
825                    .get(name)
826                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
827                let mut result = UnifiedResult::with_columns(vec![
828                    "name".into(),
829                    "count".into(),
830                    "load_factor".into(),
831                    "memory_bytes".into(),
832                ]);
833                let mut record = UnifiedRecord::new();
834                record.set("name", Value::text(name.clone()));
835                record.set("count", Value::UnsignedInteger(filter.count() as u64));
836                record.set("load_factor", Value::Float(filter.load_factor()));
837                record.set(
838                    "memory_bytes",
839                    Value::UnsignedInteger(filter.memory_bytes() as u64),
840                );
841                result.push(record);
842                Ok(RuntimeQueryResult {
843                    query: raw_query.to_string(),
844                    mode: QueryMode::Sql,
845                    statement: "filter_info",
846                    engine: "runtime-probabilistic",
847                    result,
848                    affected_rows: 0,
849                    statement_type: "select",
850                })
851            }
852            ProbabilisticCommand::DropFilter { name, if_exists } => {
853                let mut filters = probabilistic_write(
854                    &self.inner.probabilistic.filters,
855                    "probabilistic filter store",
856                );
857                if filters.remove(name).is_none() {
858                    if *if_exists {
859                        return Ok(RuntimeQueryResult::ok_message(
860                            raw_query.to_string(),
861                            &format!("FILTER '{}' does not exist", name),
862                            "drop",
863                        ));
864                    }
865                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
866                }
867                self.drop_probabilistic_catalog_entry(name)?;
868                self.delete_probabilistic_blob(PROB_FILTER_STATE_PREFIX, name)?;
869                Ok(RuntimeQueryResult::ok_message(
870                    raw_query.to_string(),
871                    &format!("FILTER '{}' dropped", name),
872                    "drop",
873                ))
874            }
875        }
876    }
877}
878
879fn parse_probabilistic_read_projection(
880    projection: &Projection,
881    index: usize,
882) -> RedDBResult<Option<ProbabilisticReadProjection>> {
883    if let Some(column) = projection_unqualified_column(projection) {
884        if column.eq_ignore_ascii_case("CARDINALITY") {
885            return Ok(Some(ProbabilisticReadProjection::Cardinality {
886                label: probabilistic_projection_label(projection, "cardinality", index),
887            }));
888        }
889    }
890
891    let Some((function, args)) = projection_function(projection) else {
892        return Ok(None);
893    };
894    if function.eq_ignore_ascii_case("FREQ") {
895        let element = projection_single_text_arg(function, args)?;
896        return Ok(Some(ProbabilisticReadProjection::Freq {
897            element,
898            label: probabilistic_projection_label(projection, "freq", index),
899        }));
900    }
901    if function.eq_ignore_ascii_case("CONTAINS") {
902        let element = projection_single_text_arg(function, args)?;
903        return Ok(Some(ProbabilisticReadProjection::Contains {
904            element,
905            label: probabilistic_projection_label(projection, "contains", index),
906        }));
907    }
908
909    Ok(None)
910}
911
912fn validate_probabilistic_read_model(
913    collection: &str,
914    actual_model: crate::catalog::CollectionModel,
915    projections: &[ProbabilisticReadProjection],
916) -> RedDBResult<()> {
917    for projection in projections {
918        let expected_model = match projection {
919            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
920            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
921            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
922        };
923        if actual_model != expected_model {
924            return Err(RedDBError::Query(format!(
925                "{} is only supported for {} collections; '{}' is {}",
926                probabilistic_projection_form(projection),
927                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
928                collection,
929                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
930            )));
931        }
932    }
933    Ok(())
934}
935
936impl RedDBRuntime {
937    fn materialize_probabilistic_select_row(
938        &self,
939        collection: &str,
940        projections: &[ProbabilisticReadProjection],
941    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
942        let mut columns = Vec::with_capacity(projections.len());
943        let mut record = UnifiedRecord::new();
944        for projection in projections {
945            match projection {
946                ProbabilisticReadProjection::Cardinality { label } => {
947                    let hlls = probabilistic_read(
948                        &self.inner.probabilistic.hlls,
949                        "probabilistic HLL store",
950                    );
951                    let hll = hlls.get(collection).ok_or_else(|| {
952                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
953                    })?;
954                    columns.push(label.clone());
955                    record.set(label, Value::UnsignedInteger(hll.count()));
956                }
957                ProbabilisticReadProjection::Freq { element, label } => {
958                    let sketches = probabilistic_read(
959                        &self.inner.probabilistic.sketches,
960                        "probabilistic sketch store",
961                    );
962                    let sketch = sketches.get(collection).ok_or_else(|| {
963                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
964                    })?;
965                    columns.push(label.clone());
966                    record.set(
967                        label,
968                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
969                    );
970                }
971                ProbabilisticReadProjection::Contains { element, label } => {
972                    let filters = probabilistic_read(
973                        &self.inner.probabilistic.filters,
974                        "probabilistic filter store",
975                    );
976                    let filter = filters.get(collection).ok_or_else(|| {
977                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
978                    })?;
979                    columns.push(label.clone());
980                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
981                }
982            }
983        }
984        Ok((columns, record))
985    }
986}
987
988fn probabilistic_select_row_visible(
989    runtime: &RedDBRuntime,
990    query: &TableQuery,
991    record: &UnifiedRecord,
992) -> bool {
993    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
994        return false;
995    }
996    let table_name = query.table.as_str();
997    let table_alias = query.alias.as_deref().unwrap_or(table_name);
998    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
999        super::join_filter::evaluate_runtime_filter_with_db(
1000            Some(&runtime.inner.db),
1001            record,
1002            &filter,
1003            Some(table_name),
1004            Some(table_alias),
1005        )
1006    })
1007}
1008
1009fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
1010    match projection {
1011        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1012            Some(column.as_str())
1013        }
1014        Projection::Column(column) => Some(column.as_str()),
1015        Projection::Alias(column, _) => Some(column.as_str()),
1016        _ => None,
1017    }
1018}
1019
1020fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
1021    match projection {
1022        Projection::Function(name, args) => {
1023            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
1024            Some((function, args.as_slice()))
1025        }
1026        _ => None,
1027    }
1028}
1029
1030fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
1031    if args.len() != 1 {
1032        return Err(RedDBError::Query(format!(
1033            "{function}(...) expects exactly one string literal"
1034        )));
1035    }
1036    match &args[0] {
1037        Projection::Column(column) => column
1038            .strip_prefix("LIT:")
1039            .map(ToString::to_string)
1040            .ok_or_else(|| {
1041                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
1042            }),
1043        _ => Err(RedDBError::Query(format!(
1044            "{function}(...) expects a string literal argument"
1045        ))),
1046    }
1047}
1048
1049fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
1050    match projection {
1051        Projection::Field(FieldRef::TableColumn { column, .. }, Some(alias))
1052            if alias.eq_ignore_ascii_case(column) =>
1053        {
1054            numbered_probabilistic_label(base, index)
1055        }
1056        Projection::Field(_, Some(alias)) => alias.clone(),
1057        Projection::Alias(column, alias) if column.eq_ignore_ascii_case(alias) => {
1058            numbered_probabilistic_label(base, index)
1059        }
1060        Projection::Alias(_, alias) => alias.clone(),
1061        Projection::Function(name, _) => name
1062            .split_once(':')
1063            .map(|(_, alias)| {
1064                if is_generated_probabilistic_function_label(alias, base) {
1065                    numbered_probabilistic_label(base, index)
1066                } else {
1067                    alias.to_string()
1068                }
1069            })
1070            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
1071        _ => numbered_probabilistic_label(base, index),
1072    }
1073}
1074
1075fn is_generated_probabilistic_function_label(alias: &str, base: &str) -> bool {
1076    alias
1077        .get(..base.len())
1078        .is_some_and(|head| head.eq_ignore_ascii_case(base))
1079        && alias[base.len()..].starts_with('(')
1080}
1081
1082fn numbered_probabilistic_label(base: &str, index: usize) -> String {
1083    if index == 0 {
1084        base.to_string()
1085    } else {
1086        format!("{base}_{}", index + 1)
1087    }
1088}
1089
1090fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
1091    match projection {
1092        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
1093        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
1094        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
1095    }
1096}