Skip to main content

reddb_server/runtime/
impl_probabilistic.rs

1//! Execution of probabilistic data structure commands (HLL, SKETCH, FILTER)
2
3use super::*;
4use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
5
6fn probabilistic_read<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockReadGuard<'a, T> {
7    lock.read()
8}
9
10fn probabilistic_write<'a, T>(lock: &'a RwLock<T>, _name: &str) -> RwLockWriteGuard<'a, T> {
11    lock.write()
12}
13
14impl RedDBRuntime {
15    pub fn execute_probabilistic_command(
16        &self,
17        raw_query: &str,
18        cmd: &ProbabilisticCommand,
19    ) -> RedDBResult<RuntimeQueryResult> {
20        // Mixed read/write surface: count/info/check are read-side and
21        // must remain available on read-only replicas; create/add/
22        // merge/delete/drop are mutations and must go through the gate.
23        let is_mutation = matches!(
24            cmd,
25            ProbabilisticCommand::CreateHll { .. }
26                | ProbabilisticCommand::HllAdd { .. }
27                | ProbabilisticCommand::HllMerge { .. }
28                | ProbabilisticCommand::DropHll { .. }
29                | ProbabilisticCommand::CreateSketch { .. }
30                | ProbabilisticCommand::SketchAdd { .. }
31                | ProbabilisticCommand::SketchMerge { .. }
32                | ProbabilisticCommand::DropSketch { .. }
33                | ProbabilisticCommand::CreateFilter { .. }
34                | ProbabilisticCommand::FilterAdd { .. }
35                | ProbabilisticCommand::FilterDelete { .. }
36                | ProbabilisticCommand::DropFilter { .. }
37        );
38        if is_mutation {
39            self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
40        }
41        match cmd {
42            // ── HyperLogLog ──────────────────────────────────────────
43            ProbabilisticCommand::CreateHll {
44                name,
45                if_not_exists,
46            } => {
47                let mut hlls =
48                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
49                if hlls.contains_key(name) {
50                    if *if_not_exists {
51                        return Ok(RuntimeQueryResult::ok_message(
52                            raw_query.to_string(),
53                            &format!("HLL '{}' already exists", name),
54                            "create",
55                        ));
56                    }
57                    return Err(RedDBError::Query(format!("HLL '{}' already exists", name)));
58                }
59                hlls.insert(
60                    name.clone(),
61                    crate::storage::primitives::hyperloglog::HyperLogLog::new(),
62                );
63                Ok(RuntimeQueryResult::ok_message(
64                    raw_query.to_string(),
65                    &format!("HLL '{}' created", name),
66                    "create",
67                ))
68            }
69            ProbabilisticCommand::HllAdd { name, elements } => {
70                let mut hlls =
71                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
72                let hll = hlls
73                    .get_mut(name)
74                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
75                for elem in elements {
76                    hll.add(elem.as_bytes());
77                }
78                Ok(RuntimeQueryResult::ok_message(
79                    raw_query.to_string(),
80                    &format!("{} element(s) added to HLL '{}'", elements.len(), name),
81                    "insert",
82                ))
83            }
84            ProbabilisticCommand::HllCount { names } => {
85                let hlls =
86                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
87                if names.len() == 1 {
88                    let hll = hlls.get(&names[0]).ok_or_else(|| {
89                        RedDBError::NotFound(format!("HLL '{}' not found", names[0]))
90                    })?;
91                    let count = hll.count();
92                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
93                    let mut record = UnifiedRecord::new();
94                    record.set("count", Value::UnsignedInteger(count));
95                    result.push(record);
96                    Ok(RuntimeQueryResult {
97                        query: raw_query.to_string(),
98                        mode: QueryMode::Sql,
99                        statement: "hll_count",
100                        engine: "runtime-probabilistic",
101                        result,
102                        affected_rows: 0,
103                        statement_type: "select",
104                    })
105                } else {
106                    // Multi-HLL count = union count
107                    let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
108                    for name in names {
109                        let hll = hlls.get(name).ok_or_else(|| {
110                            RedDBError::NotFound(format!("HLL '{}' not found", name))
111                        })?;
112                        merged.merge(hll);
113                    }
114                    let count = merged.count();
115                    let mut result = UnifiedResult::with_columns(vec!["count".into()]);
116                    let mut record = UnifiedRecord::new();
117                    record.set("count", Value::UnsignedInteger(count));
118                    result.push(record);
119                    Ok(RuntimeQueryResult {
120                        query: raw_query.to_string(),
121                        mode: QueryMode::Sql,
122                        statement: "hll_count",
123                        engine: "runtime-probabilistic",
124                        result,
125                        affected_rows: 0,
126                        statement_type: "select",
127                    })
128                }
129            }
130            ProbabilisticCommand::HllMerge { dest, sources } => {
131                let mut hlls =
132                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
133                let mut merged = crate::storage::primitives::hyperloglog::HyperLogLog::new();
134                for src in sources {
135                    let hll = hlls
136                        .get(src)
137                        .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", src)))?;
138                    merged.merge(hll);
139                }
140                hlls.insert(dest.clone(), merged);
141                Ok(RuntimeQueryResult::ok_message(
142                    raw_query.to_string(),
143                    &format!(
144                        "HLL '{}' created from merge of {}",
145                        dest,
146                        sources.join(", ")
147                    ),
148                    "create",
149                ))
150            }
151            ProbabilisticCommand::HllInfo { name } => {
152                let hlls =
153                    probabilistic_read(&self.inner.probabilistic.hlls, "probabilistic HLL store");
154                let hll = hlls
155                    .get(name)
156                    .ok_or_else(|| RedDBError::NotFound(format!("HLL '{}' not found", name)))?;
157                let mut result = UnifiedResult::with_columns(vec![
158                    "name".into(),
159                    "count".into(),
160                    "memory_bytes".into(),
161                ]);
162                let mut record = UnifiedRecord::new();
163                record.set("name", Value::text(name.clone()));
164                record.set("count", Value::UnsignedInteger(hll.count()));
165                record.set(
166                    "memory_bytes",
167                    Value::UnsignedInteger(hll.memory_bytes() as u64),
168                );
169                result.push(record);
170                Ok(RuntimeQueryResult {
171                    query: raw_query.to_string(),
172                    mode: QueryMode::Sql,
173                    statement: "hll_info",
174                    engine: "runtime-probabilistic",
175                    result,
176                    affected_rows: 0,
177                    statement_type: "select",
178                })
179            }
180            ProbabilisticCommand::DropHll { name, if_exists } => {
181                let mut hlls =
182                    probabilistic_write(&self.inner.probabilistic.hlls, "probabilistic HLL store");
183                if hlls.remove(name).is_none() {
184                    if *if_exists {
185                        return Ok(RuntimeQueryResult::ok_message(
186                            raw_query.to_string(),
187                            &format!("HLL '{}' does not exist", name),
188                            "drop",
189                        ));
190                    }
191                    return Err(RedDBError::NotFound(format!("HLL '{}' not found", name)));
192                }
193                Ok(RuntimeQueryResult::ok_message(
194                    raw_query.to_string(),
195                    &format!("HLL '{}' dropped", name),
196                    "drop",
197                ))
198            }
199
200            // ── Count-Min Sketch ───────────────────────────────────────
201            ProbabilisticCommand::CreateSketch {
202                name,
203                width,
204                depth,
205                if_not_exists,
206            } => {
207                let mut sketches = probabilistic_write(
208                    &self.inner.probabilistic.sketches,
209                    "probabilistic sketch store",
210                );
211                if sketches.contains_key(name) {
212                    if *if_not_exists {
213                        return Ok(RuntimeQueryResult::ok_message(
214                            raw_query.to_string(),
215                            &format!("SKETCH '{}' already exists", name),
216                            "create",
217                        ));
218                    }
219                    return Err(RedDBError::Query(format!(
220                        "SKETCH '{}' already exists",
221                        name
222                    )));
223                }
224                sketches.insert(
225                    name.clone(),
226                    crate::storage::primitives::count_min_sketch::CountMinSketch::new(
227                        *width, *depth,
228                    ),
229                );
230                Ok(RuntimeQueryResult::ok_message(
231                    raw_query.to_string(),
232                    &format!(
233                        "SKETCH '{}' created (width={}, depth={})",
234                        name, width, depth
235                    ),
236                    "create",
237                ))
238            }
239            ProbabilisticCommand::SketchAdd {
240                name,
241                element,
242                count,
243            } => {
244                let mut sketches = probabilistic_write(
245                    &self.inner.probabilistic.sketches,
246                    "probabilistic sketch store",
247                );
248                let sketch = sketches
249                    .get_mut(name)
250                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
251                sketch.add(element.as_bytes(), *count);
252                Ok(RuntimeQueryResult::ok_message(
253                    raw_query.to_string(),
254                    &format!("added {} to SKETCH '{}'", count, name),
255                    "insert",
256                ))
257            }
258            ProbabilisticCommand::SketchCount { name, element } => {
259                let sketches = probabilistic_read(
260                    &self.inner.probabilistic.sketches,
261                    "probabilistic sketch store",
262                );
263                let sketch = sketches
264                    .get(name)
265                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
266                let estimate = sketch.estimate(element.as_bytes());
267                let mut result = UnifiedResult::with_columns(vec!["estimate".into()]);
268                let mut record = UnifiedRecord::new();
269                record.set("estimate", Value::UnsignedInteger(estimate));
270                result.push(record);
271                Ok(RuntimeQueryResult {
272                    query: raw_query.to_string(),
273                    mode: QueryMode::Sql,
274                    statement: "sketch_count",
275                    engine: "runtime-probabilistic",
276                    result,
277                    affected_rows: 0,
278                    statement_type: "select",
279                })
280            }
281            ProbabilisticCommand::SketchMerge { dest, sources } => {
282                let mut sketches = probabilistic_write(
283                    &self.inner.probabilistic.sketches,
284                    "probabilistic sketch store",
285                );
286                let first_src = sketches.get(&sources[0]).ok_or_else(|| {
287                    RedDBError::NotFound(format!("SKETCH '{}' not found", sources[0]))
288                })?;
289                let mut merged = crate::storage::primitives::count_min_sketch::CountMinSketch::new(
290                    first_src.width(),
291                    first_src.depth(),
292                );
293                for src in sources {
294                    let sketch = sketches.get(src).ok_or_else(|| {
295                        RedDBError::NotFound(format!("SKETCH '{}' not found", src))
296                    })?;
297                    if !merged.merge(sketch) {
298                        return Err(RedDBError::Query(format!(
299                            "SKETCH '{}' has incompatible dimensions",
300                            src
301                        )));
302                    }
303                }
304                sketches.insert(dest.clone(), merged);
305                Ok(RuntimeQueryResult::ok_message(
306                    raw_query.to_string(),
307                    &format!(
308                        "SKETCH '{}' created from merge of {}",
309                        dest,
310                        sources.join(", ")
311                    ),
312                    "create",
313                ))
314            }
315            ProbabilisticCommand::SketchInfo { name } => {
316                let sketches = probabilistic_read(
317                    &self.inner.probabilistic.sketches,
318                    "probabilistic sketch store",
319                );
320                let sketch = sketches
321                    .get(name)
322                    .ok_or_else(|| RedDBError::NotFound(format!("SKETCH '{}' not found", name)))?;
323                let mut result = UnifiedResult::with_columns(vec![
324                    "name".into(),
325                    "width".into(),
326                    "depth".into(),
327                    "total".into(),
328                    "memory_bytes".into(),
329                ]);
330                let mut record = UnifiedRecord::new();
331                record.set("name", Value::text(name.clone()));
332                record.set("width", Value::UnsignedInteger(sketch.width() as u64));
333                record.set("depth", Value::UnsignedInteger(sketch.depth() as u64));
334                record.set("total", Value::UnsignedInteger(sketch.total()));
335                record.set(
336                    "memory_bytes",
337                    Value::UnsignedInteger(sketch.memory_bytes() as u64),
338                );
339                result.push(record);
340                Ok(RuntimeQueryResult {
341                    query: raw_query.to_string(),
342                    mode: QueryMode::Sql,
343                    statement: "sketch_info",
344                    engine: "runtime-probabilistic",
345                    result,
346                    affected_rows: 0,
347                    statement_type: "select",
348                })
349            }
350            ProbabilisticCommand::DropSketch { name, if_exists } => {
351                let mut sketches = probabilistic_write(
352                    &self.inner.probabilistic.sketches,
353                    "probabilistic sketch store",
354                );
355                if sketches.remove(name).is_none() {
356                    if *if_exists {
357                        return Ok(RuntimeQueryResult::ok_message(
358                            raw_query.to_string(),
359                            &format!("SKETCH '{}' does not exist", name),
360                            "drop",
361                        ));
362                    }
363                    return Err(RedDBError::NotFound(format!("SKETCH '{}' not found", name)));
364                }
365                Ok(RuntimeQueryResult::ok_message(
366                    raw_query.to_string(),
367                    &format!("SKETCH '{}' dropped", name),
368                    "drop",
369                ))
370            }
371
372            // ── Cuckoo Filter ─────────────────────────────────────────
373            ProbabilisticCommand::CreateFilter {
374                name,
375                capacity,
376                if_not_exists,
377            } => {
378                let mut filters = probabilistic_write(
379                    &self.inner.probabilistic.filters,
380                    "probabilistic filter store",
381                );
382                if filters.contains_key(name) {
383                    if *if_not_exists {
384                        return Ok(RuntimeQueryResult::ok_message(
385                            raw_query.to_string(),
386                            &format!("FILTER '{}' already exists", name),
387                            "create",
388                        ));
389                    }
390                    return Err(RedDBError::Query(format!(
391                        "FILTER '{}' already exists",
392                        name
393                    )));
394                }
395                filters.insert(
396                    name.clone(),
397                    crate::storage::primitives::cuckoo_filter::CuckooFilter::new(*capacity),
398                );
399                Ok(RuntimeQueryResult::ok_message(
400                    raw_query.to_string(),
401                    &format!("FILTER '{}' created (capacity={})", name, capacity),
402                    "create",
403                ))
404            }
405            ProbabilisticCommand::FilterAdd { name, element } => {
406                let mut filters = probabilistic_write(
407                    &self.inner.probabilistic.filters,
408                    "probabilistic filter store",
409                );
410                let filter = filters
411                    .get_mut(name)
412                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
413                if !filter.insert(element.as_bytes()) {
414                    return Err(RedDBError::Query(format!("FILTER '{}' is full", name)));
415                }
416                Ok(RuntimeQueryResult::ok_message(
417                    raw_query.to_string(),
418                    &format!("element added to FILTER '{}'", name),
419                    "insert",
420                ))
421            }
422            ProbabilisticCommand::FilterCheck { name, element } => {
423                let filters = probabilistic_read(
424                    &self.inner.probabilistic.filters,
425                    "probabilistic filter store",
426                );
427                let filter = filters
428                    .get(name)
429                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
430                let exists = filter.contains(element.as_bytes());
431                let mut result = UnifiedResult::with_columns(vec!["exists".into()]);
432                let mut record = UnifiedRecord::new();
433                record.set("exists", Value::Boolean(exists));
434                result.push(record);
435                Ok(RuntimeQueryResult {
436                    query: raw_query.to_string(),
437                    mode: QueryMode::Sql,
438                    statement: "filter_check",
439                    engine: "runtime-probabilistic",
440                    result,
441                    affected_rows: 0,
442                    statement_type: "select",
443                })
444            }
445            ProbabilisticCommand::FilterDelete { name, element } => {
446                let mut filters = probabilistic_write(
447                    &self.inner.probabilistic.filters,
448                    "probabilistic filter store",
449                );
450                let filter = filters
451                    .get_mut(name)
452                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
453                let removed = filter.delete(element.as_bytes());
454                Ok(RuntimeQueryResult::ok_message(
455                    raw_query.to_string(),
456                    &format!(
457                        "element {} from FILTER '{}'",
458                        if removed { "deleted" } else { "not found in" },
459                        name
460                    ),
461                    "delete",
462                ))
463            }
464            ProbabilisticCommand::FilterCount { name } => {
465                let filters = probabilistic_read(
466                    &self.inner.probabilistic.filters,
467                    "probabilistic filter store",
468                );
469                let filter = filters
470                    .get(name)
471                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
472                let mut result = UnifiedResult::with_columns(vec!["count".into()]);
473                let mut record = UnifiedRecord::new();
474                record.set("count", Value::UnsignedInteger(filter.count() as u64));
475                result.push(record);
476                Ok(RuntimeQueryResult {
477                    query: raw_query.to_string(),
478                    mode: QueryMode::Sql,
479                    statement: "filter_count",
480                    engine: "runtime-probabilistic",
481                    result,
482                    affected_rows: 0,
483                    statement_type: "select",
484                })
485            }
486            ProbabilisticCommand::FilterInfo { name } => {
487                let filters = probabilistic_read(
488                    &self.inner.probabilistic.filters,
489                    "probabilistic filter store",
490                );
491                let filter = filters
492                    .get(name)
493                    .ok_or_else(|| RedDBError::NotFound(format!("FILTER '{}' not found", name)))?;
494                let mut result = UnifiedResult::with_columns(vec![
495                    "name".into(),
496                    "count".into(),
497                    "load_factor".into(),
498                    "memory_bytes".into(),
499                ]);
500                let mut record = UnifiedRecord::new();
501                record.set("name", Value::text(name.clone()));
502                record.set("count", Value::UnsignedInteger(filter.count() as u64));
503                record.set("load_factor", Value::Float(filter.load_factor()));
504                record.set(
505                    "memory_bytes",
506                    Value::UnsignedInteger(filter.memory_bytes() as u64),
507                );
508                result.push(record);
509                Ok(RuntimeQueryResult {
510                    query: raw_query.to_string(),
511                    mode: QueryMode::Sql,
512                    statement: "filter_info",
513                    engine: "runtime-probabilistic",
514                    result,
515                    affected_rows: 0,
516                    statement_type: "select",
517                })
518            }
519            ProbabilisticCommand::DropFilter { name, if_exists } => {
520                let mut filters = probabilistic_write(
521                    &self.inner.probabilistic.filters,
522                    "probabilistic filter store",
523                );
524                if filters.remove(name).is_none() {
525                    if *if_exists {
526                        return Ok(RuntimeQueryResult::ok_message(
527                            raw_query.to_string(),
528                            &format!("FILTER '{}' does not exist", name),
529                            "drop",
530                        ));
531                    }
532                    return Err(RedDBError::NotFound(format!("FILTER '{}' not found", name)));
533                }
534                Ok(RuntimeQueryResult::ok_message(
535                    raw_query.to_string(),
536                    &format!("FILTER '{}' dropped", name),
537                    "drop",
538                ))
539            }
540        }
541    }
542}