use std::sync::Arc;
use nodedb_types::{CloneOrigin, CloneStatus, Lsn, TenantId};
use crate::control::state::SharedState;
use crate::types::VShardId;
use nodedb_physical::physical_task::PhysicalTask;
use super::super::metadata::ClonePredicatesNote;
use super::rewrite::rewrite_plan_for_source;
pub struct CloneReadParams {
pub query_lsn: Lsn,
pub query_ms: Option<i64>,
}
pub enum ResolveOutcome {
Passthrough(Vec<PhysicalTask>),
PreDatesClone(ClonePredicatesNote),
Augmented {
tasks: Vec<PhysicalTask>,
source_start_idx: usize,
origin: CloneOrigin,
target_collection_key: String,
note: Option<ClonePredicatesNote>,
},
}
pub fn resolve_read(
state: &Arc<SharedState>,
tasks: Vec<PhysicalTask>,
tenant_id: TenantId,
params: &CloneReadParams,
) -> crate::Result<Option<ResolveOutcome>> {
let Some(first_task) = tasks.first() else {
return Ok(None);
};
let db_id = first_task.database_id;
let catalog_arc = state.credentials.catalog();
let Some(catalog) = catalog_arc.as_ref() else {
return Ok(None);
};
let Some(raw_coll) = super::rewrite::extract_collection_from_plan(&first_task.plan) else {
return Ok(None);
};
let coll_name = super::rewrite::strip_db_prefix(db_id, raw_coll);
let Some(desc) = catalog
.get_collection(db_id, tenant_id.as_u64(), coll_name)
.map_err(|e| crate::Error::Storage {
engine: "catalog".into(),
detail: format!("clone resolver: get_collection failed: {e}"),
})?
else {
return Ok(None);
};
let Some(ref origin) = desc.cloned_from else {
return Ok(None);
};
match desc.clone_status {
CloneStatus::Materialized => return Ok(None),
CloneStatus::Shadowed | CloneStatus::Materializing { .. } => {}
}
if params.query_lsn < origin.clone_created_at {
return Ok(Some(ResolveOutcome::PreDatesClone(
ClonePredicatesNote::new(params.query_lsn, origin.clone_created_at),
)));
}
let effective_source_lsn = if params.query_lsn > origin.as_of_lsn {
origin.as_of_lsn
} else {
params.query_lsn
};
let effective_source_ms = state.ms_to_lsn_inverse(effective_source_lsn);
let mut augmented_tasks = tasks.clone();
let source_start_idx = augmented_tasks.len();
let mut cur_db_id = db_id;
let mut cur_coll_name_owned = coll_name.to_string();
let mut cur_origin = origin.clone();
let mut cur_effective_ms = effective_source_ms;
let mut prev_level_tasks: Vec<PhysicalTask> = tasks.clone();
const MAX_WALK: u32 = 8;
let mut depth = 0u32;
loop {
if depth >= MAX_WALK {
break;
}
depth += 1;
let src_db_id = cur_origin.source_database;
let src_coll_name = cur_origin.source_collection.as_str();
let cur_coll_str = cur_coll_name_owned.as_str();
let mut this_level_tasks: Vec<PhysicalTask> = Vec::new();
for task in &prev_level_tasks {
if let Some(source_plan) =
rewrite_plan_for_source(super::rewrite::RewriteForSourceParams {
plan: &task.plan,
target_db_id: cur_db_id,
source_db_id: src_db_id,
target_coll: cur_coll_str,
source_coll: src_coll_name,
effective_source_ms: cur_effective_ms,
kv_surrogate_ceiling: cur_origin.kv_surrogate_ceiling,
state,
})
{
let source_vshard = VShardId::from_collection_in_database(
src_db_id,
&crate::control::planner::sql_plan_convert::convert::db_qualified(
src_db_id,
src_coll_name,
),
);
let task = PhysicalTask {
tenant_id,
vshard_id: source_vshard,
database_id: src_db_id,
plan: source_plan,
post_set_op: nodedb_physical::physical_task::PostSetOp::None,
};
this_level_tasks.push(task);
}
}
for task in this_level_tasks.iter().cloned() {
augmented_tasks.push(task);
}
prev_level_tasks = this_level_tasks;
let ancestor_desc = catalog
.get_collection(src_db_id, tenant_id.as_u64(), src_coll_name)
.map_err(|e| crate::Error::Storage {
engine: "catalog".into(),
detail: format!("clone resolver: ancestor get_collection failed: {e}"),
})?;
let Some(ancestor) = ancestor_desc else { break };
match ancestor.clone_status {
CloneStatus::Materialized => break,
CloneStatus::Shadowed | CloneStatus::Materializing { .. } => {}
}
let Some(ancestor_origin) = ancestor.cloned_from else {
break;
};
let ancestor_effective_lsn = if params.query_lsn > ancestor_origin.as_of_lsn {
ancestor_origin.as_of_lsn
} else {
params.query_lsn
};
cur_effective_ms = state.ms_to_lsn_inverse(ancestor_effective_lsn);
cur_db_id = src_db_id;
cur_coll_name_owned = src_coll_name.to_string();
cur_origin = ancestor_origin;
}
let target_collection_key =
crate::control::planner::sql_plan_convert::convert::db_qualified(db_id, coll_name);
Ok(Some(ResolveOutcome::Augmented {
tasks: augmented_tasks,
source_start_idx,
origin: origin.clone(),
target_collection_key,
note: None,
}))
}