use std::sync::Arc;
use nodedb_types::DatabaseId;
use crate::bridge::physical_plan::{ColumnarOp, DocumentOp, KvOp, PhysicalPlan, TimeseriesOp};
use crate::control::state::SharedState;
pub struct RewriteForSourceParams<'a> {
pub plan: &'a PhysicalPlan,
pub target_db_id: DatabaseId,
pub source_db_id: DatabaseId,
pub target_coll: &'a str,
pub source_coll: &'a str,
pub effective_source_ms: Option<i64>,
pub kv_surrogate_ceiling: Option<u32>,
pub state: &'a Arc<SharedState>,
}
pub fn rewrite_plan_for_source(params: RewriteForSourceParams<'_>) -> Option<PhysicalPlan> {
use crate::control::planner::sql_plan_convert::convert::db_qualified;
let RewriteForSourceParams {
plan,
target_db_id,
source_db_id,
target_coll,
source_coll,
effective_source_ms,
kv_surrogate_ceiling,
state,
} = params;
let target_qualified = db_qualified(target_db_id, target_coll);
let source_qualified = db_qualified(source_db_id, source_coll);
match plan {
PhysicalPlan::Document(DocumentOp::Scan {
collection,
limit,
offset,
sort_keys,
filters,
distinct,
projection,
computed_columns,
window_functions,
system_as_of_ms,
valid_at_ms,
prefilter,
}) if collection == &target_qualified => Some(PhysicalPlan::Document(DocumentOp::Scan {
collection: source_qualified,
limit: *limit,
offset: *offset,
sort_keys: sort_keys.clone(),
filters: filters.clone(),
distinct: *distinct,
projection: projection.clone(),
computed_columns: computed_columns.clone(),
window_functions: window_functions.clone(),
system_as_of_ms: effective_source_ms.or(*system_as_of_ms),
valid_at_ms: *valid_at_ms,
prefilter: prefilter.clone(),
})),
PhysicalPlan::Document(DocumentOp::PointGet {
collection,
document_id,
surrogate: _,
pk_bytes,
rls_filters,
system_as_of_ms,
valid_at_ms,
}) if collection == &target_qualified => {
let source_surrogate = state
.surrogate_assigner
.lookup(&source_qualified, pk_bytes)
.ok()
.flatten()?;
Some(PhysicalPlan::Document(DocumentOp::PointGet {
collection: source_qualified,
document_id: document_id.clone(),
surrogate: source_surrogate,
pk_bytes: pk_bytes.clone(),
rls_filters: rls_filters.clone(),
system_as_of_ms: effective_source_ms.or(*system_as_of_ms),
valid_at_ms: *valid_at_ms,
}))
}
PhysicalPlan::Document(DocumentOp::IndexedFetch {
collection,
path,
value,
filters,
projection,
limit,
offset,
}) if collection == &target_qualified => {
Some(PhysicalPlan::Document(DocumentOp::IndexedFetch {
collection: source_qualified,
path: path.clone(),
value: value.clone(),
filters: filters.clone(),
projection: projection.clone(),
limit: *limit,
offset: *offset,
}))
}
PhysicalPlan::Kv(KvOp::Scan {
collection,
cursor,
count,
filters,
match_pattern,
sort_keys,
surrogate_ceiling: _,
}) if collection == &target_qualified => Some(PhysicalPlan::Kv(KvOp::Scan {
collection: source_qualified,
cursor: cursor.clone(),
count: *count,
filters: filters.clone(),
match_pattern: match_pattern.clone(),
sort_keys: sort_keys.clone(),
surrogate_ceiling: kv_surrogate_ceiling,
})),
PhysicalPlan::Kv(KvOp::Get {
collection,
key,
rls_filters,
surrogate_ceiling: _,
}) if collection == &target_qualified => Some(PhysicalPlan::Kv(KvOp::Get {
collection: source_qualified,
key: key.clone(),
rls_filters: rls_filters.clone(),
surrogate_ceiling: kv_surrogate_ceiling,
})),
PhysicalPlan::Columnar(ColumnarOp::Scan {
collection,
projection,
limit,
filters,
rls_filters,
sort_keys,
system_as_of_ms,
valid_at_ms,
prefilter,
}) if collection == &target_qualified => Some(PhysicalPlan::Columnar(ColumnarOp::Scan {
collection: source_qualified,
projection: projection.clone(),
limit: *limit,
filters: filters.clone(),
rls_filters: rls_filters.clone(),
sort_keys: sort_keys.clone(),
system_as_of_ms: effective_source_ms.or(*system_as_of_ms),
valid_at_ms: *valid_at_ms,
prefilter: prefilter.clone(),
})),
PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection,
time_range,
projection,
limit,
filters,
bucket_interval_ms,
group_by,
aggregates,
gap_fill,
computed_columns,
rls_filters,
system_as_of_ms,
valid_at_ms,
}) if collection == &target_qualified => {
Some(PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection: source_qualified,
time_range: *time_range,
projection: projection.clone(),
limit: *limit,
filters: filters.clone(),
bucket_interval_ms: *bucket_interval_ms,
group_by: group_by.clone(),
aggregates: aggregates.clone(),
gap_fill: gap_fill.clone(),
computed_columns: computed_columns.clone(),
rls_filters: rls_filters.clone(),
system_as_of_ms: effective_source_ms.or(*system_as_of_ms),
valid_at_ms: *valid_at_ms,
}))
}
PhysicalPlan::Document(_)
| PhysicalPlan::Kv(_)
| PhysicalPlan::Vector(_)
| PhysicalPlan::Graph(_)
| PhysicalPlan::Text(_)
| PhysicalPlan::Columnar(_)
| PhysicalPlan::Timeseries(_)
| PhysicalPlan::Spatial(_)
| PhysicalPlan::Crdt(_)
| PhysicalPlan::Query(_)
| PhysicalPlan::Meta(_)
| PhysicalPlan::Array(_)
| PhysicalPlan::ClusterArray(_) => None,
}
}
pub(super) fn extract_collection_from_plan(plan: &PhysicalPlan) -> Option<&str> {
match plan {
PhysicalPlan::Document(DocumentOp::Scan { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::PointGet { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::IndexedFetch { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::PointPut { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::PointUpdate { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::PointDelete { collection, .. }) => Some(collection),
PhysicalPlan::Document(DocumentOp::PointInsert { collection, .. }) => Some(collection),
PhysicalPlan::Kv(KvOp::Scan { collection, .. }) => Some(collection),
PhysicalPlan::Kv(KvOp::Get { collection, .. }) => Some(collection),
PhysicalPlan::Columnar(ColumnarOp::Scan { collection, .. }) => Some(collection),
PhysicalPlan::Timeseries(TimeseriesOp::Scan { collection, .. }) => Some(collection),
PhysicalPlan::Document(_)
| PhysicalPlan::Kv(_)
| PhysicalPlan::Vector(_)
| PhysicalPlan::Graph(_)
| PhysicalPlan::Text(_)
| PhysicalPlan::Columnar(_)
| PhysicalPlan::Timeseries(_)
| PhysicalPlan::Spatial(_)
| PhysicalPlan::Crdt(_)
| PhysicalPlan::Query(_)
| PhysicalPlan::Meta(_)
| PhysicalPlan::Array(_)
| PhysicalPlan::ClusterArray(_) => None,
}
}
pub(super) fn strip_db_prefix(db_id: DatabaseId, qualified: &str) -> &str {
if db_id == DatabaseId::DEFAULT {
return qualified;
}
let prefix = format!("{}/", db_id.as_u64());
if let Some(stripped) = qualified.strip_prefix(prefix.as_str()) {
stripped
} else {
qualified
}
}