Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
7    lock.read()
8}
9
10fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
11    lock.write()
12}
13
14fn probabilistic_collection_contract(
15    name: &str,
16    model: crate::catalog::CollectionModel,
17) -> crate::physical::CollectionContract {
18    let now = crate::utils::now_unix_millis() as u128;
19    crate::physical::CollectionContract {
20        name: name.to_string(),
21        declared_model: model,
22        schema_mode: crate::catalog::SchemaMode::Dynamic,
23        origin: crate::physical::ContractOrigin::Explicit,
24        version: 1,
25        created_at_unix_ms: now,
26        updated_at_unix_ms: now,
27        default_ttl_ms: None,
28        vector_dimension: None,
29        vector_metric: None,
30        context_index_fields: Vec::new(),
31        declared_columns: Vec::new(),
32        table_def: None,
33        timestamps_enabled: false,
34        context_index_enabled: false,
35        append_only: false,
36        subscriptions: Vec::new(),
37    }
38}
39
40enum ProbabilisticReadProjection {
41    Cardinality { label: String },
42    Freq { element: String, label: String },
43    Contains { element: String, label: String },
44}
45
46impl RedDBRuntime {
47    fn create_probabilistic_catalog_entry(
48        &self,
49        name: &str,
50        model: crate::catalog::CollectionModel,
51    ) -> RedDBResult<()> {
52        let store = self.inner.db.store();
53        store
54            .create_collection(name)
55            .map_err(|err| RedDBError::Internal(err.to_string()))?;
56        self.inner
57            .db
58            .save_collection_contract(probabilistic_collection_contract(name, model))
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
61            store.set_config_tree(
62                &format!("red.collection_tenants.{name}"),
63                &crate::serde_json::Value::String(tenant_id),
64            );
65        }
66        self.inner
67            .db
68            .persist_metadata()
69            .map_err(|err| RedDBError::Internal(err.to_string()))?;
70        self.invalidate_result_cache();
71        Ok(())
72    }
73
74    fn drop_probabilistic_catalog_entry(&self, name: &str) -> RedDBResult<()> {
75        let store = self.inner.db.store();
76        if store.get_collection(name).is_some() {
77            store
78                .drop_collection(name)
79                .map_err(|err| RedDBError::Internal(err.to_string()))?;
80        }
81        self.inner
82            .db
83            .remove_collection_contract(name)
84            .map_err(|err| RedDBError::Internal(err.to_string()))?;
85        self.inner
86            .db
87            .persist_metadata()
88            .map_err(|err| RedDBError::Internal(err.to_string()))?;
89        self.invalidate_result_cache();
90        Ok(())
91    }
92
93    pub(crate) fn execute_probabilistic_select(
94        &self,
95        query: &TableQuery,
96    ) -> RedDBResult<Option<UnifiedResult>> {
97        let projections = crate::storage::query::sql_lowering::effective_table_projections(query);
98        let mut read_projections = Vec::new();
99        for projection in &projections {
100            if let Some(read_projection) =
101                parse_probabilistic_read_projection(projection, read_projections.len())?
102            {
103                read_projections.push(read_projection);
104            }
105        }
106
107        let Some(actual_model) = self
108            .inner
109            .db
110            .collection_contract(&query.table)
111            .map(|contract| contract.declared_model)
112        else {
113            return if read_projections.is_empty() {
114                Ok(None)
115            } else {
116                Err(RedDBError::NotFound(format!(
117                    "probabilistic collection '{}' not found",
118                    query.table
119                )))
120            };
121        };
122
123        let is_probabilistic_model = matches!(
124            actual_model,
125            crate::catalog::CollectionModel::Hll
126                | crate::catalog::CollectionModel::Sketch
127                | crate::catalog::CollectionModel::Filter
128        );
129        if read_projections.is_empty() {
130            return if is_probabilistic_model {
131                Err(RedDBError::Query(format!(
132                    "probabilistic collection '{}' supports SELECT CARDINALITY, FREQ(...), or CONTAINS(...) read forms",
133                    query.table
134                )))
135            } else {
136                Ok(None)
137            };
138        }
139
140        validate_probabilistic_read_model(&query.table, actual_model, &read_projections)?;
141        let (columns, record) =
142            self.materialize_probabilistic_select_row(&query.table, &read_projections)?;
143        let mut result = UnifiedResult::with_columns(columns);
144        if probabilistic_select_row_visible(self, query, &record) {
145            result.push(record);
146        }
147        Ok(Some(result))
148    }
149
150    pub fn execute_probabilistic_command(
151        &self,
152        raw_query: &str,
153        cmd: &ProbabilisticCommand,
154    ) -> RedDBResult<RuntimeQueryResult> {
155        // Mixed read/write surface: count/info/check are read-side and
156        // must remain available on read-only replicas; create/add/
157        // merge/delete/drop are mutations and must go through the gate.
158        let is_mutation = matches!(
159            cmd,
160            ProbabilisticCommand::CreateHll { .. }
161                | ProbabilisticCommand::HllAdd { .. }
162                | ProbabilisticCommand::HllMerge { .. }
163                | ProbabilisticCommand::DropHll { .. }
164                | ProbabilisticCommand::CreateSketch { .. }
165                | ProbabilisticCommand::SketchAdd { .. }
166                | ProbabilisticCommand::SketchMerge { .. }
167                | ProbabilisticCommand::DropSketch { .. }
168                | ProbabilisticCommand::CreateFilter { .. }
169                | ProbabilisticCommand::FilterAdd { .. }
170                | ProbabilisticCommand::FilterDelete { .. }
171                | ProbabilisticCommand::DropFilter { .. }
172        );
173        if is_mutation {
174            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
175        }
176        match cmd {
177            // ── HyperLogLog ──────────────────────────────────────────
178            ProbabilisticCommand::CreateHll {
179                name,
180                precision,
181                if_not_exists,
182            } => {
183                let mut hlls =
184                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
185                if hlls.contains_key(name) {
186                    if *if_not_exists {
187                        return Ok(RuntimeQueryResult::ok_message(
188                            raw_query.to_string(),
189                            &format!("HLL '{}' already exists", name),
190                            "create",
191                        ));
192                    }
193                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
194                }
195                let hll = crate::storage::primitives::hyperloglog::HyperLogLog::with_precision(
196                    *precision,
197                )
198                .ok_or_else(|| {
199                    RedDBError::Query(format!(
200                        "HLL precision must be between 4 and 18, got {precision}"
201                    ))
202                })?;
203                self.create_probabilistic_catalog_entry(
204                    name,
205                    crate::catalog::CollectionModel::Hll,
206                )?;
207                hlls.insert(name.clone(), hll);
208                Ok(RuntimeQueryResult::ok_message(
209                    raw_query.to_string(),
210                    &format!("HLL '{}' created", name),
211                    "create",
212                ))
213            }
214            ProbabilisticCommand::HllAdd { name, elements } => {
215                let mut hlls =
216                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
217                let hll = hlls
218                    .get_mut(name)
219                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
220                for elem in elements {
221                    hll.add(elem.as_bytes());
222                }
223                Ok(RuntimeQueryResult::ok_message(
224                    raw_query.to_string(),
225                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
226                    "insert",
227                ))
228            }
229            ProbabilisticCommand::HllCount { names } => {
230                let hlls =
231                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
232                if names.len() == 1 {
233                    let hll = hlls.get(&names[0]).ok_or_else(|| {
234                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
235                    })?;
236                    let count = hll.count();
237                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
238                    let mut record = UnifiedRecord::new();
239                    record.set("count", Value::UnsignedInteger(count));
240                    result.push(record);
241                    Ok(RuntimeQueryResult {
242                        query: raw_query.to_string(),
243                        mode: QueryMode::Sql,
244                        statement: "hll_count",
245                        engine: "runtime-probabilistic",
246                        result,
247                        affected_rows: 0,
248                        statement_type: "select",
249                    })
250                } else {
251                    // Multi-HLL count = union count
252                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
253                    for name in names {
254                        let hll = hlls.get(name).ok_or_else(|| {
255                            RedDBError::NotFound(format!("HLL '{}' not found", name))
256                        })?;
257                        merged.merge(hll);
258                    }
259                    let count = merged.count();
260                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
261                    let mut record = UnifiedRecord::new();
262                    record.set("count", Value::UnsignedInteger(count));
263                    result.push(record);
264                    Ok(RuntimeQueryResult {
265                        query: raw_query.to_string(),
266                        mode: QueryMode::Sql,
267                        statement: "hll_count",
268                        engine: "runtime-probabilistic",
269                        result,
270                        affected_rows: 0,
271                        statement_type: "select",
272                    })
273                }
274            }
275            ProbabilisticCommand::HllMerge { dest, sources } => {
276                let mut hlls =
277                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
278                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
279                for src in sources {
280                    let hll = hlls
281                        .get(src)
282                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
283                    merged.merge(hll);
284                }
285                hlls.insert(dest.clone(), merged);
286                Ok(RuntimeQueryResult::ok_message(
287                    raw_query.to_string(),
288                    &format!(
289                        "HLL '{}' created from merge of {}",
290                        dest,
291                        sources.join(", ")
292                    ),
293                    "create",
294                ))
295            }
296            ProbabilisticCommand::HllInfo { name } => {
297                let hlls =
298                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
299                let hll = hlls
300                    .get(name)
301                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
302                let mut result = UnifiedResult::with_columns(vec![
303                    "name".into(),
304                    "precision".into(),
305                    "count".into(),
306                    "memory_bytes".into(),
307                ]);
308                let mut record = UnifiedRecord::new();
309                record.set("name", Value::text(name.clone()));
310                record.set("precision", Value::UnsignedInteger(hll.precision() as u64));
311                record.set("count", Value::UnsignedInteger(hll.count()));
312                record.set(
313                    "memory_bytes",
314                    Value::UnsignedInteger(hll.memory_bytes() as u64),
315                );
316                result.push(record);
317                Ok(RuntimeQueryResult {
318                    query: raw_query.to_string(),
319                    mode: QueryMode::Sql,
320                    statement: "hll_info",
321                    engine: "runtime-probabilistic",
322                    result,
323                    affected_rows: 0,
324                    statement_type: "select",
325                })
326            }
327            ProbabilisticCommand::DropHll { name, if_exists } => {
328                let mut hlls =
329                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
330                if hlls.remove(name).is_none() {
331                    if *if_exists {
332                        return Ok(RuntimeQueryResult::ok_message(
333                            raw_query.to_string(),
334                            &format!("HLL '{}' does not exist", name),
335                            "drop",
336                        ));
337                    }
338                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
339                }
340                self.drop_probabilistic_catalog_entry(name)?;
341                Ok(RuntimeQueryResult::ok_message(
342                    raw_query.to_string(),
343                    &format!("HLL '{}' dropped", name),
344                    "drop",
345                ))
346            }
347
348            // ── Count-Min Sketch ───────────────────────────────────────
349            ProbabilisticCommand::CreateSketch {
350                name,
351                width,
352                depth,
353                if_not_exists,
354            } => {
355                let mut sketches = probabilistic_write(
356                    &self.inner.probabilistic.sketches,
357                    "probabilistic sketch store",
358                );
359                if sketches.contains_key(name) {
360                    if *if_not_exists {
361                        return Ok(RuntimeQueryResult::ok_message(
362                            raw_query.to_string(),
363                            &format!("SKETCH '{}' already exists", name),
364                            "create",
365                        ));
366                    }
367                    return Err(RedDBError::Query(format!(
368                        "SKETCH '{}' already exists",
369                        name
370                    )));
371                }
372                self.create_probabilistic_catalog_entry(
373                    name,
374                    crate::catalog::CollectionModel::Sketch,
375                )?;
376                sketches.insert(
377                    name.clone(),
378                    crate::storage::primitives::count_min_sketch::CountMinSketch::new(
379                        *width, *depth,
380                    ),
381                );
382                Ok(RuntimeQueryResult::ok_message(
383                    raw_query.to_string(),
384                    &format!(
385                        "SKETCH '{}' created (width={}, depth={})",
386                        name, width, depth
387                    ),
388                    "create",
389                ))
390            }
391            ProbabilisticCommand::SketchAdd {
392                name,
393                element,
394                count,
395            } => {
396                let mut sketches = probabilistic_write(
397                    &self.inner.probabilistic.sketches,
398                    "probabilistic sketch store",
399                );
400                let sketch = sketches
401                    .get_mut(name)
402                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
403                sketch.add(element.as_bytes(), *count);
404                Ok(RuntimeQueryResult::ok_message(
405                    raw_query.to_string(),
406                    &format!("added {} to SKETCH '{}'", count, name),
407                    "insert",
408                ))
409            }
410            ProbabilisticCommand::SketchCount { name, element } => {
411                let sketches = probabilistic_read(
412                    &self.inner.probabilistic.sketches,
413                    "probabilistic sketch store",
414                );
415                let sketch = sketches
416                    .get(name)
417                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
418                let estimate = sketch.estimate(element.as_bytes());
419                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
420                let mut record = UnifiedRecord::new();
421                record.set("estimate", Value::UnsignedInteger(estimate));
422                result.push(record);
423                Ok(RuntimeQueryResult {
424                    query: raw_query.to_string(),
425                    mode: QueryMode::Sql,
426                    statement: "sketch_count",
427                    engine: "runtime-probabilistic",
428                    result,
429                    affected_rows: 0,
430                    statement_type: "select",
431                })
432            }
433            ProbabilisticCommand::SketchMerge { dest, sources } => {
434                let mut sketches = probabilistic_write(
435                    &self.inner.probabilistic.sketches,
436                    "probabilistic sketch store",
437                );
438                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
439                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
440                })?;
441                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
442                    first_src.width(),
443                    first_src.depth(),
444                );
445                for src in sources {
446                    let sketch = sketches.get(src).ok_or_else(|| {
447                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
448                    })?;
449                    if !merged.merge(sketch) {
450                        return Err(RedDBError::Query(format!(
451                            "SKETCH '{}' has incompatible dimensions",
452                            src
453                        )));
454                    }
455                }
456                sketches.insert(dest.clone(), merged);
457                Ok(RuntimeQueryResult::ok_message(
458                    raw_query.to_string(),
459                    &format!(
460                        "SKETCH '{}' created from merge of {}",
461                        dest,
462                        sources.join(", ")
463                    ),
464                    "create",
465                ))
466            }
467            ProbabilisticCommand::SketchInfo { name } => {
468                let sketches = probabilistic_read(
469                    &self.inner.probabilistic.sketches,
470                    "probabilistic sketch store",
471                );
472                let sketch = sketches
473                    .get(name)
474                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
475                let mut result = UnifiedResult::with_columns(vec![
476                    "name".into(),
477                    "width".into(),
478                    "depth".into(),
479                    "total".into(),
480                    "memory_bytes".into(),
481                ]);
482                let mut record = UnifiedRecord::new();
483                record.set("name", Value::text(name.clone()));
484                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
485                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
486                record.set("total", Value::UnsignedInteger(sketch.total()));
487                record.set(
488                    "memory_bytes",
489                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
490                );
491                result.push(record);
492                Ok(RuntimeQueryResult {
493                    query: raw_query.to_string(),
494                    mode: QueryMode::Sql,
495                    statement: "sketch_info",
496                    engine: "runtime-probabilistic",
497                    result,
498                    affected_rows: 0,
499                    statement_type: "select",
500                })
501            }
502            ProbabilisticCommand::DropSketch { name, if_exists } => {
503                let mut sketches = probabilistic_write(
504                    &self.inner.probabilistic.sketches,
505                    "probabilistic sketch store",
506                );
507                if sketches.remove(name).is_none() {
508                    if *if_exists {
509                        return Ok(RuntimeQueryResult::ok_message(
510                            raw_query.to_string(),
511                            &format!("SKETCH '{}' does not exist", name),
512                            "drop",
513                        ));
514                    }
515                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
516                }
517                self.drop_probabilistic_catalog_entry(name)?;
518                Ok(RuntimeQueryResult::ok_message(
519                    raw_query.to_string(),
520                    &format!("SKETCH '{}' dropped", name),
521                    "drop",
522                ))
523            }
524
525            // ── Cuckoo Filter ─────────────────────────────────────────
526            ProbabilisticCommand::CreateFilter {
527                name,
528                capacity,
529                if_not_exists,
530            } => {
531                let mut filters = probabilistic_write(
532                    &self.inner.probabilistic.filters,
533                    "probabilistic filter store",
534                );
535                if filters.contains_key(name) {
536                    if *if_not_exists {
537                        return Ok(RuntimeQueryResult::ok_message(
538                            raw_query.to_string(),
539                            &format!("FILTER '{}' already exists", name),
540                            "create",
541                        ));
542                    }
543                    return Err(RedDBError::Query(format!(
544                        "FILTER '{}' already exists",
545                        name
546                    )));
547                }
548                self.create_probabilistic_catalog_entry(
549                    name,
550                    crate::catalog::CollectionModel::Filter,
551                )?;
552                filters.insert(
553                    name.clone(),
554                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity),
555                );
556                Ok(RuntimeQueryResult::ok_message(
557                    raw_query.to_string(),
558                    &format!("FILTER '{}' created (capacity={})", name, capacity),
559                    "create",
560                ))
561            }
562            ProbabilisticCommand::FilterAdd { name, element } => {
563                let mut filters = probabilistic_write(
564                    &self.inner.probabilistic.filters,
565                    "probabilistic filter store",
566                );
567                let filter = filters
568                    .get_mut(name)
569                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
570                if !filter.insert(element.as_bytes()) {
571                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
572                }
573                Ok(RuntimeQueryResult::ok_message(
574                    raw_query.to_string(),
575                    &format!("element added to FILTER '{}'", name),
576                    "insert",
577                ))
578            }
579            ProbabilisticCommand::FilterCheck { name, element } => {
580                let filters = probabilistic_read(
581                    &self.inner.probabilistic.filters,
582                    "probabilistic filter store",
583                );
584                let filter = filters
585                    .get(name)
586                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
587                let exists = filter.contains(element.as_bytes());
588                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
589                let mut record = UnifiedRecord::new();
590                record.set("exists", Value::Boolean(exists));
591                result.push(record);
592                Ok(RuntimeQueryResult {
593                    query: raw_query.to_string(),
594                    mode: QueryMode::Sql,
595                    statement: "filter_check",
596                    engine: "runtime-probabilistic",
597                    result,
598                    affected_rows: 0,
599                    statement_type: "select",
600                })
601            }
602            ProbabilisticCommand::FilterDelete { name, element } => {
603                let mut filters = probabilistic_write(
604                    &self.inner.probabilistic.filters,
605                    "probabilistic filter store",
606                );
607                let filter = filters
608                    .get_mut(name)
609                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
610                let removed = filter.delete(element.as_bytes());
611                Ok(RuntimeQueryResult::ok_message(
612                    raw_query.to_string(),
613                    &format!(
614                        "element {} from FILTER '{}'",
615                        if removed { "deleted" } else { "not found in" },
616                        name
617                    ),
618                    "delete",
619                ))
620            }
621            ProbabilisticCommand::FilterCount { name } => {
622                let filters = probabilistic_read(
623                    &self.inner.probabilistic.filters,
624                    "probabilistic filter store",
625                );
626                let filter = filters
627                    .get(name)
628                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
629                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
630                let mut record = UnifiedRecord::new();
631                record.set("count", Value::UnsignedInteger(filter.count() as u64));
632                result.push(record);
633                Ok(RuntimeQueryResult {
634                    query: raw_query.to_string(),
635                    mode: QueryMode::Sql,
636                    statement: "filter_count",
637                    engine: "runtime-probabilistic",
638                    result,
639                    affected_rows: 0,
640                    statement_type: "select",
641                })
642            }
643            ProbabilisticCommand::FilterInfo { name } => {
644                let filters = probabilistic_read(
645                    &self.inner.probabilistic.filters,
646                    "probabilistic filter store",
647                );
648                let filter = filters
649                    .get(name)
650                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
651                let mut result = UnifiedResult::with_columns(vec![
652                    "name".into(),
653                    "count".into(),
654                    "load_factor".into(),
655                    "memory_bytes".into(),
656                ]);
657                let mut record = UnifiedRecord::new();
658                record.set("name", Value::text(name.clone()));
659                record.set("count", Value::UnsignedInteger(filter.count() as u64));
660                record.set("load_factor", Value::Float(filter.load_factor()));
661                record.set(
662                    "memory_bytes",
663                    Value::UnsignedInteger(filter.memory_bytes() as u64),
664                );
665                result.push(record);
666                Ok(RuntimeQueryResult {
667                    query: raw_query.to_string(),
668                    mode: QueryMode::Sql,
669                    statement: "filter_info",
670                    engine: "runtime-probabilistic",
671                    result,
672                    affected_rows: 0,
673                    statement_type: "select",
674                })
675            }
676            ProbabilisticCommand::DropFilter { name, if_exists } => {
677                let mut filters = probabilistic_write(
678                    &self.inner.probabilistic.filters,
679                    "probabilistic filter store",
680                );
681                if filters.remove(name).is_none() {
682                    if *if_exists {
683                        return Ok(RuntimeQueryResult::ok_message(
684                            raw_query.to_string(),
685                            &format!("FILTER '{}' does not exist", name),
686                            "drop",
687                        ));
688                    }
689                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
690                }
691                self.drop_probabilistic_catalog_entry(name)?;
692                Ok(RuntimeQueryResult::ok_message(
693                    raw_query.to_string(),
694                    &format!("FILTER '{}' dropped", name),
695                    "drop",
696                ))
697            }
698        }
699    }
700}
701
702fn parse_probabilistic_read_projection(
703    projection: &Projection,
704    index: usize,
705) -> RedDBResult<Option<ProbabilisticReadProjection>> {
706    if let Some(column) = projection_unqualified_column(projection) {
707        if column.eq_ignore_ascii_case("CARDINALITY") {
708            return Ok(Some(ProbabilisticReadProjection::Cardinality {
709                label: probabilistic_projection_label(projection, "cardinality", index),
710            }));
711        }
712    }
713
714    let Some((function, args)) = projection_function(projection) else {
715        return Ok(None);
716    };
717    if function.eq_ignore_ascii_case("FREQ") {
718        let element = projection_single_text_arg(function, args)?;
719        return Ok(Some(ProbabilisticReadProjection::Freq {
720            element,
721            label: probabilistic_projection_label(projection, "freq", index),
722        }));
723    }
724    if function.eq_ignore_ascii_case("CONTAINS") {
725        let element = projection_single_text_arg(function, args)?;
726        return Ok(Some(ProbabilisticReadProjection::Contains {
727            element,
728            label: probabilistic_projection_label(projection, "contains", index),
729        }));
730    }
731
732    Ok(None)
733}
734
735fn validate_probabilistic_read_model(
736    collection: &str,
737    actual_model: crate::catalog::CollectionModel,
738    projections: &[ProbabilisticReadProjection],
739) -> RedDBResult<()> {
740    for projection in projections {
741        let expected_model = match projection {
742            ProbabilisticReadProjection::Cardinality { .. } => crate::catalog::CollectionModel::Hll,
743            ProbabilisticReadProjection::Freq { .. } => crate::catalog::CollectionModel::Sketch,
744            ProbabilisticReadProjection::Contains { .. } => crate::catalog::CollectionModel::Filter,
745        };
746        if actual_model != expected_model {
747            return Err(RedDBError::Query(format!(
748                "{} is only supported for {} collections; '{}' is {}",
749                probabilistic_projection_form(projection),
750                crate::runtime::ddl::polymorphic_resolver::model_name(expected_model),
751                collection,
752                crate::runtime::ddl::polymorphic_resolver::model_name(actual_model)
753            )));
754        }
755    }
756    Ok(())
757}
758
759impl RedDBRuntime {
760    fn materialize_probabilistic_select_row(
761        &self,
762        collection: &str,
763        projections: &[ProbabilisticReadProjection],
764    ) -> RedDBResult<(Vec<String>, UnifiedRecord)> {
765        let mut columns = Vec::with_capacity(projections.len());
766        let mut record = UnifiedRecord::new();
767        for projection in projections {
768            match projection {
769                ProbabilisticReadProjection::Cardinality { label } => {
770                    let hlls = probabilistic_read(
771                        &self.inner.probabilistic.hlls,
772                        "probabilistic HLL store",
773                    );
774                    let hll = hlls.get(collection).ok_or_else(|| {
775                        RedDBError::NotFound(format!("HLL '{}' not found", collection))
776                    })?;
777                    columns.push(label.clone());
778                    record.set(label, Value::UnsignedInteger(hll.count()));
779                }
780                ProbabilisticReadProjection::Freq { element, label } => {
781                    let sketches = probabilistic_read(
782                        &self.inner.probabilistic.sketches,
783                        "probabilistic sketch store",
784                    );
785                    let sketch = sketches.get(collection).ok_or_else(|| {
786                        RedDBError::NotFound(format!("SKETCH '{}' not found", collection))
787                    })?;
788                    columns.push(label.clone());
789                    record.set(
790                        label,
791                        Value::UnsignedInteger(sketch.estimate(element.as_bytes())),
792                    );
793                }
794                ProbabilisticReadProjection::Contains { element, label } => {
795                    let filters = probabilistic_read(
796                        &self.inner.probabilistic.filters,
797                        "probabilistic filter store",
798                    );
799                    let filter = filters.get(collection).ok_or_else(|| {
800                        RedDBError::NotFound(format!("FILTER '{}' not found", collection))
801                    })?;
802                    columns.push(label.clone());
803                    record.set(label, Value::Boolean(filter.contains(element.as_bytes())));
804                }
805            }
806        }
807        Ok((columns, record))
808    }
809}
810
811fn probabilistic_select_row_visible(
812    runtime: &RedDBRuntime,
813    query: &TableQuery,
814    record: &UnifiedRecord,
815) -> bool {
816    if query.limit == Some(0) || query.offset.is_some_and(|offset| offset > 0) {
817        return false;
818    }
819    let table_name = query.table.as_str();
820    let table_alias = query.alias.as_deref().unwrap_or(table_name);
821    crate::storage::query::sql_lowering::effective_table_filter(query).is_none_or(|filter| {
822        super::join_filter::evaluate_runtime_filter_with_db(
823            Some(&runtime.inner.db),
824            record,
825            &filter,
826            Some(table_name),
827            Some(table_alias),
828        )
829    })
830}
831
832fn projection_unqualified_column(projection: &Projection) -> Option<&str> {
833    match projection {
834        Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
835            Some(column.as_str())
836        }
837        Projection::Column(column) => Some(column.as_str()),
838        Projection::Alias(column, _) => Some(column.as_str()),
839        _ => None,
840    }
841}
842
843fn projection_function(projection: &Projection) -> Option<(&str, &[Projection])> {
844    match projection {
845        Projection::Function(name, args) => {
846            let function = name.split_once(':').map(|(name, _)| name).unwrap_or(name);
847            Some((function, args.as_slice()))
848        }
849        _ => None,
850    }
851}
852
853fn projection_single_text_arg(function: &str, args: &[Projection]) -> RedDBResult<String> {
854    if args.len() != 1 {
855        return Err(RedDBError::Query(format!(
856            "{function}(...) expects exactly one string literal"
857        )));
858    }
859    match &args[0] {
860        Projection::Column(column) => column
861            .strip_prefix("LIT:")
862            .map(ToString::to_string)
863            .ok_or_else(|| {
864                RedDBError::Query(format!("{function}(...) expects a string literal argument"))
865            }),
866        _ => Err(RedDBError::Query(format!(
867            "{function}(...) expects a string literal argument"
868        ))),
869    }
870}
871
872fn probabilistic_projection_label(projection: &Projection, base: &str, index: usize) -> String {
873    match projection {
874        Projection::Field(_, Some(alias)) => alias.clone(),
875        Projection::Alias(_, alias) => alias.clone(),
876        Projection::Function(name, _) => name
877            .split_once(':')
878            .map(|(_, alias)| alias.to_string())
879            .unwrap_or_else(|| numbered_probabilistic_label(base, index)),
880        _ => numbered_probabilistic_label(base, index),
881    }
882}
883
884fn numbered_probabilistic_label(base: &str, index: usize) -> String {
885    if index == 0 {
886        base.to_string()
887    } else {
888        format!("{base}_{}", index + 1)
889    }
890}
891
892fn probabilistic_projection_form(projection: &ProbabilisticReadProjection) -> &'static str {
893    match projection {
894        ProbabilisticReadProjection::Cardinality { .. } => "SELECT CARDINALITY",
895        ProbabilisticReadProjection::Freq { .. } => "FREQ(...)",
896        ProbabilisticReadProjection::Contains { .. } => "CONTAINS(...)",
897    }
898}