use crate::api::{RedDBError, RedDBResult};
use crate::storage::schema::Value;
use crate::RedDB;
const RED_CONFIG_COLLECTION: &str = "red_config";
const CONFIG_KEY: &str = "stream.executor.max_materialized_rows";
pub(crate) const DEFAULT_MAX_MATERIALIZED_ROWS: usize = 1_000_000;
pub(crate) fn max_materialized_rows(db: &RedDB) -> usize {
let mut newest: Option<(u64, u64)> = None;
if let Some(manager) = db.store().get_collection(RED_CONFIG_COLLECTION) {
manager.for_each_entity(|entity| {
let Some(row) = entity.data.as_row() else {
return true;
};
let Some(Value::Text(key)) = row.get_field("key") else {
return true;
};
if !key.eq_ignore_ascii_case(CONFIG_KEY) {
return true;
}
let parsed: Option<u64> = match row.get_field("value") {
Some(Value::Integer(v)) if *v >= 0 => Some(*v as u64),
Some(Value::UnsignedInteger(v)) => Some(*v),
Some(Value::Float(v)) if *v >= 0.0 => Some(*v as u64),
Some(Value::Text(text)) => text.trim().parse().ok(),
_ => None,
};
if let Some(v) = parsed {
let id = entity.id.raw();
if newest.is_none_or(|(best_id, _)| id >= best_id) {
newest = Some((id, v));
}
}
true
});
}
match newest {
None => DEFAULT_MAX_MATERIALIZED_ROWS,
Some((_, 0)) => usize::MAX, Some((_, v)) => v as usize,
}
}
pub(crate) fn check(executor: &'static str, current: usize, limit: usize) -> RedDBResult<()> {
if current > limit {
return Err(RedDBError::MaterializationLimitExceeded {
executor,
limit,
current,
});
}
Ok(())
}
pub(crate) fn guard(db: &RedDB, executor: &'static str, current: usize) -> RedDBResult<()> {
check(executor, current, max_materialized_rows(db))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{RedDBOptions, RedDBRuntime};
fn mk_runtime() -> RedDBRuntime {
RedDBRuntime::with_options(RedDBOptions::in_memory())
.expect("in-memory runtime should open")
}
#[test]
fn check_passes_at_or_below_limit() {
assert!(check("aggregation", 0, 10).is_ok());
assert!(check("aggregation", 10, 10).is_ok());
}
#[test]
fn check_fails_just_past_limit_and_names_executor() {
let err = check("sort", 11, 10).unwrap_err();
match err {
RedDBError::MaterializationLimitExceeded {
executor,
limit,
current,
} => {
assert_eq!(executor, "sort");
assert_eq!(limit, 10);
assert_eq!(current, 11);
}
other => panic!("expected MaterializationLimitExceeded, got {other:?}"),
}
}
#[test]
fn default_applies_when_key_absent() {
let rt = mk_runtime();
assert_eq!(
max_materialized_rows(&rt.db()),
DEFAULT_MAX_MATERIALIZED_ROWS
);
}
#[test]
fn configured_value_is_read_back() {
let rt = mk_runtime();
rt.execute_query("SET CONFIG stream.executor.max_materialized_rows = 42")
.expect("set config ok");
assert_eq!(max_materialized_rows(&rt.db()), 42);
}
#[test]
fn zero_means_unbounded() {
let rt = mk_runtime();
rt.execute_query("SET CONFIG stream.executor.max_materialized_rows = 0")
.expect("set config ok");
assert_eq!(max_materialized_rows(&rt.db()), usize::MAX);
}
}