use smallvec::SmallVec;
use crate::active_query::CompletedQuery;
use crate::cycle::{CycleHeads, CycleRecoveryStrategy, IterationCount};
use crate::function::memo::Memo;
use crate::function::sync::ReleaseMode;
use crate::function::{ClaimGuard, Configuration, IngredientImpl};
use crate::hash::{FxHashSet, FxIndexSet};
use crate::ingredient::WaitForResult;
use crate::plumbing::ZalsaLocal;
use crate::sync::thread;
use crate::tracked_struct::Identity;
use crate::zalsa::{MemoIngredientIndex, Zalsa};
use crate::zalsa_local::{ActiveQueryGuard, QueryEdge, QueryEdgeKind, QueryRevisions};
use crate::{Cancelled, Cycle, tracing};
use crate::{DatabaseKeyIndex, Event, EventKind, Id};
impl<C> IngredientImpl<C>
where
C: Configuration,
{
#[inline(never)]
pub(super) fn execute<'db>(
&'db self,
db: &'db C::DbView,
mut claim_guard: ClaimGuard<'db>,
opt_old_memo: Option<&Memo<'db, C>>,
) -> Option<&'db Memo<'db, C>> {
let database_key_index = claim_guard.database_key_index();
let zalsa = claim_guard.zalsa();
let id = database_key_index.key_index();
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
crate::tracing::info!("{:?}: executing query", database_key_index);
zalsa.event(&|| {
Event::new(EventKind::WillExecute {
database_key: database_key_index,
})
});
let (new_value, mut completed_query) = match C::CYCLE_STRATEGY {
CycleRecoveryStrategy::Panic => {
let (new_value, active_query) = Self::execute_query(
db,
zalsa,
claim_guard
.zalsa_local()
.push_query(database_key_index, IterationCount::initial()),
opt_old_memo,
);
(new_value, active_query.pop())
}
CycleRecoveryStrategy::FallbackImmediate | CycleRecoveryStrategy::Fixpoint => {
let zalsa_local = claim_guard.zalsa_local();
let was_disabled = zalsa_local.set_cancellation_disabled(true);
let res = self.execute_maybe_iterate(
db,
opt_old_memo,
&mut claim_guard,
memo_ingredient_index,
);
zalsa_local.set_cancellation_disabled(was_disabled);
res
}
};
if let Some(old_memo) = opt_old_memo {
self.backdate_if_appropriate(
old_memo,
database_key_index,
&mut completed_query.revisions,
&new_value,
);
self.diff_outputs(zalsa, database_key_index, old_memo, &completed_query);
}
let memo = self.insert_memo(
zalsa,
id,
Memo::new(
Some(new_value),
zalsa.current_revision(),
completed_query.revisions,
),
memo_ingredient_index,
);
if claim_guard.drop() { None } else { Some(memo) }
}
fn execute_maybe_iterate<'db>(
&'db self,
db: &'db C::DbView,
opt_old_memo: Option<&Memo<'db, C>>,
claim_guard: &mut ClaimGuard<'db>,
memo_ingredient_index: MemoIngredientIndex,
) -> (C::Output<'db>, CompletedQuery) {
claim_guard.set_release_mode(ReleaseMode::Default);
let database_key_index = claim_guard.database_key_index();
let zalsa = claim_guard.zalsa();
let id = database_key_index.key_index();
let mut last_provisional_memo_opt: Option<&Memo<'db, C>> = None;
let mut last_stale_tracked_ids: Vec<(Identity, Id)> = Vec::new();
let mut iteration_count = IterationCount::initial();
if let Some(old_memo) = opt_old_memo {
if old_memo.verified_at.load() == zalsa.current_revision() {
if old_memo.value.is_none() {
tracing::warn!(
"Propagating panic for cycle head that panicked in an earlier execution in that revision"
);
Cancelled::PropagatedPanic.throw();
}
if old_memo.cycle_heads().contains(&database_key_index) {
last_provisional_memo_opt = Some(old_memo);
}
iteration_count = old_memo.revisions.iteration();
}
}
let _poison_guard =
PoisonProvisionalIfPanicking::new(self, zalsa, id, memo_ingredient_index);
let (new_value, completed_query) = loop {
let active_query = claim_guard
.zalsa_local()
.push_query(database_key_index, iteration_count);
active_query.seed_tracked_struct_ids(&last_stale_tracked_ids);
let (mut new_value, mut active_query) = Self::execute_query(
db,
zalsa,
active_query,
last_provisional_memo_opt.or(opt_old_memo),
);
let mut cycle_heads = active_query.take_cycle_heads();
if cycle_heads.is_empty() {
let mut completed_query = active_query.pop();
if !iteration_count.is_initial() {
iteration_count = iteration_count.increment().unwrap_or_else(|| {
tracing::warn!(
"{database_key_index:?}: execute: too many cycle iterations"
);
panic!("{database_key_index:?}: execute: too many cycle iterations")
});
completed_query
.revisions
.update_cycle_participant_iteration_count(iteration_count);
}
break (new_value, completed_query);
}
let (max_iteration_count, depends_on_self) = collect_all_cycle_heads(
zalsa,
&mut cycle_heads,
database_key_index,
iteration_count,
);
let outer_cycle = outer_cycle(
zalsa,
claim_guard.zalsa_local(),
&cycle_heads,
database_key_index,
);
if !depends_on_self {
let Some(outer_cycle) = outer_cycle else {
panic!(
"cycle participant with non-empty cycle heads and that doesn't depend on itself must have an outer cycle responsible to finalize the query later (query: {database_key_index:?}, cycle heads: {cycle_heads:?})."
);
};
let new_value = if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
C::cycle_initial(db, id, C::id_to_input(zalsa, id))
} else {
new_value
};
let completed_query = complete_cycle_participant(
active_query,
claim_guard,
cycle_heads,
outer_cycle,
iteration_count,
);
break (new_value, completed_query);
}
let last_provisional_memo = last_provisional_memo_opt.unwrap_or_else(|| {
let memo = self
.get_memo_from_table_for(zalsa, id, memo_ingredient_index)
.unwrap_or_else(|| {
unreachable!(
"{database_key_index:#?} is a cycle head, \
but no provisional memo found"
)
});
debug_assert!(memo.may_be_provisional());
memo
});
let last_provisional_value = last_provisional_memo.value.as_ref();
let last_provisional_value = last_provisional_value.expect(
"`fetch_cold_cycle` should have inserted a provisional memo with Cycle::initial",
);
tracing::debug!(
"{database_key_index:?}: execute: \
I am a cycle head, comparing last provisional value with new value"
);
iteration_count = if outer_cycle.is_none() {
max_iteration_count
} else {
iteration_count
};
let value_converged = if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
new_value = C::cycle_initial(db, id, C::id_to_input(zalsa, id));
true
} else {
let cycle = Cycle {
head_ids: cycle_heads.ids(),
id,
iteration: iteration_count.as_u32(),
};
new_value = C::recover_from_cycle(
db,
&cycle,
last_provisional_value,
new_value,
C::id_to_input(zalsa, id),
);
C::values_equal(&new_value, last_provisional_value)
};
let new_cycle_heads = active_query.take_cycle_heads();
assert_no_new_cycle_heads(&cycle_heads, new_cycle_heads, database_key_index);
let completed_query = match try_complete_cycle_head(
active_query,
claim_guard,
cycle_heads,
&last_provisional_memo.revisions,
outer_cycle,
iteration_count,
value_converged,
) {
Ok(completed_query) => {
break (new_value, completed_query);
}
Err((completed_query, new_iteration_count)) => {
iteration_count = new_iteration_count;
completed_query
}
};
let new_memo = self.insert_memo(
zalsa,
id,
Memo::new(
Some(new_value),
zalsa.current_revision(),
completed_query.revisions,
),
memo_ingredient_index,
);
last_provisional_memo_opt = Some(new_memo);
last_stale_tracked_ids = completed_query.stale_tracked_structs;
continue;
};
tracing::debug!(
"{database_key_index:?}: execute_maybe_iterate: result.revisions = {revisions:#?}",
revisions = &completed_query.revisions
);
(new_value, completed_query)
}
#[inline]
fn execute_query<'db>(
db: &'db C::DbView,
zalsa: &'db Zalsa,
active_query: ActiveQueryGuard<'db>,
opt_old_memo: Option<&Memo<'db, C>>,
) -> (C::Output<'db>, ActiveQueryGuard<'db>) {
if let Some(old_memo) = opt_old_memo {
active_query.seed_tracked_struct_ids(old_memo.revisions.tracked_struct_ids());
if old_memo.may_be_provisional()
&& old_memo.verified_at.load() == zalsa.current_revision()
{
active_query.seed_iteration(&old_memo.revisions);
}
}
let new_value = C::execute(
db,
C::id_to_input(zalsa, active_query.database_key_index.key_index()),
);
(new_value, active_query)
}
}
struct PoisonProvisionalIfPanicking<'a, C: Configuration> {
ingredient: &'a IngredientImpl<C>,
zalsa: &'a Zalsa,
id: Id,
memo_ingredient_index: MemoIngredientIndex,
}
impl<'a, C: Configuration> PoisonProvisionalIfPanicking<'a, C> {
fn new(
ingredient: &'a IngredientImpl<C>,
zalsa: &'a Zalsa,
id: Id,
memo_ingredient_index: MemoIngredientIndex,
) -> Self {
Self {
ingredient,
zalsa,
id,
memo_ingredient_index,
}
}
}
impl<C: Configuration> Drop for PoisonProvisionalIfPanicking<'_, C> {
fn drop(&mut self) {
if thread::panicking() {
let revisions = QueryRevisions::fixpoint_initial(
self.ingredient.database_key_index(self.id),
IterationCount::initial(),
);
let memo = Memo::new(None, self.zalsa.current_revision(), revisions);
self.ingredient
.insert_memo(self.zalsa, self.id, memo, self.memo_ingredient_index);
}
}
}
fn outer_cycle(
zalsa: &Zalsa,
zalsa_local: &ZalsaLocal,
cycle_heads: &CycleHeads,
current_key: DatabaseKeyIndex,
) -> Option<DatabaseKeyIndex> {
if let Some(same_thread) = unsafe {
zalsa_local.with_query_stack_unchecked(|stack| {
stack
.iter()
.find(|active_query| {
active_query.database_key_index != current_key
&& cycle_heads.contains(&active_query.database_key_index)
})
.map(|active_query| active_query.database_key_index)
})
} {
return Some(same_thread);
}
cycle_heads
.iter_not_eq(current_key)
.rfind(|head| {
let ingredient = zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
matches!(
ingredient.wait_for(zalsa, head.database_key_index.key_index()),
WaitForResult::Cycle { inner: false }
)
})
.map(|head| head.database_key_index)
}
fn collect_all_cycle_heads(
zalsa: &Zalsa,
cycle_heads: &mut CycleHeads,
database_key_index: DatabaseKeyIndex,
iteration_count: IterationCount,
) -> (IterationCount, bool) {
fn collect_recursive(
zalsa: &Zalsa,
current_head: DatabaseKeyIndex,
me: DatabaseKeyIndex,
query_heads: &CycleHeads,
missing_heads: &mut SmallVec<[(DatabaseKeyIndex, IterationCount); 4]>,
) -> (IterationCount, bool) {
if current_head == me {
return (IterationCount::initial(), true);
}
let mut max_iteration_count = IterationCount::initial();
let mut depends_on_self = false;
let ingredient = zalsa.lookup_ingredient(current_head.ingredient_index());
let provisional_status = ingredient
.provisional_status(zalsa, current_head.key_index())
.expect("cycle head memo must have been created during the execution");
assert!(provisional_status.is_provisional());
for head in provisional_status.cycle_heads() {
let iteration_count = head.iteration_count.load();
max_iteration_count = max_iteration_count.max(iteration_count);
if query_heads.contains(&head.database_key_index) {
continue;
}
let head_as_tuple = (head.database_key_index, iteration_count);
if missing_heads.contains(&head_as_tuple) {
continue;
}
missing_heads.push((head.database_key_index, iteration_count));
let (nested_max_iteration_count, nested_depends_on_self) = collect_recursive(
zalsa,
head.database_key_index,
me,
query_heads,
missing_heads,
);
max_iteration_count = max_iteration_count.max(nested_max_iteration_count);
depends_on_self |= nested_depends_on_self;
}
(max_iteration_count, depends_on_self)
}
let mut missing_heads: SmallVec<[(DatabaseKeyIndex, IterationCount); 4]> = SmallVec::new();
let mut max_iteration_count = iteration_count;
let mut depends_on_self = false;
for head in &*cycle_heads {
let (recursive_max_iteration, recursive_depends_on_self) = collect_recursive(
zalsa,
head.database_key_index,
database_key_index,
cycle_heads,
&mut missing_heads,
);
max_iteration_count = max_iteration_count.max(recursive_max_iteration);
depends_on_self |= recursive_depends_on_self;
}
for (head, iteration) in missing_heads {
cycle_heads.insert(head, iteration);
}
(max_iteration_count, depends_on_self)
}
fn complete_cycle_participant(
active_query: ActiveQueryGuard,
claim_guard: &mut ClaimGuard,
cycle_heads: CycleHeads,
outer_cycle: DatabaseKeyIndex,
iteration_count: IterationCount,
) -> CompletedQuery {
claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle));
let zalsa = claim_guard.zalsa();
let database_key_index = active_query.database_key_index;
let mut completed_query = active_query.pop();
flatten_cycle_dependencies(zalsa, &mut completed_query.revisions);
*completed_query.revisions.verified_final.get_mut() = false;
completed_query.revisions.set_cycle_heads(cycle_heads);
let iteration_count = iteration_count.increment().unwrap_or_else(|| {
tracing::warn!("{database_key_index:?}: execute: too many cycle iterations");
panic!("{database_key_index:?}: execute: too many cycle iterations")
});
completed_query
.revisions
.update_cycle_participant_iteration_count(iteration_count);
completed_query
}
fn try_complete_cycle_head(
active_query: ActiveQueryGuard,
claim_guard: &mut ClaimGuard,
cycle_heads: CycleHeads,
last_provisional_revisions: &QueryRevisions,
outer_cycle: Option<DatabaseKeyIndex>,
iteration_count: IterationCount,
value_converged: bool,
) -> Result<CompletedQuery, (CompletedQuery, IterationCount)> {
let me = active_query.database_key_index;
let zalsa = claim_guard.zalsa();
let mut completed_query = active_query.pop();
flatten_cycle_dependencies(zalsa, &mut completed_query.revisions);
let metadata_converged = last_provisional_revisions.durability
== completed_query.revisions.durability
&& last_provisional_revisions.changed_at == completed_query.revisions.changed_at
&& last_provisional_revisions.origin.is_derived_untracked()
== completed_query.revisions.origin.is_derived_untracked();
let this_converged = value_converged && metadata_converged;
if let Some(outer_cycle) = outer_cycle {
tracing::info!(
"Detected nested cycle {me:?}, iterate it as part of the outer cycle {outer_cycle:?}"
);
completed_query.revisions.set_cycle_heads(cycle_heads);
completed_query
.revisions
.set_cycle_converged(this_converged);
*completed_query.revisions.verified_final.get_mut() = false;
claim_guard.set_release_mode(ReleaseMode::TransferTo(outer_cycle));
return Ok(completed_query);
}
let converged = this_converged
&& cycle_heads.iter_not_eq(me).all(|head| {
let database_key_index = head.database_key_index;
let ingredient = zalsa.lookup_ingredient(database_key_index.ingredient_index());
let converged = ingredient.cycle_converged(zalsa, database_key_index.key_index());
if !converged {
tracing::debug!("inner cycle {database_key_index:?} has not converged",);
}
converged
});
if converged {
tracing::debug!(
"{me:?}: execute: fixpoint iteration has a final value after {iteration_count:?} iterations"
);
for head in cycle_heads.iter_not_eq(me) {
let ingredient = zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
ingredient.finalize_cycle_head(zalsa, head.database_key_index.key_index());
}
*completed_query.revisions.verified_final.get_mut() = true;
zalsa.event(&|| {
Event::new(EventKind::DidFinalizeCycle {
database_key: me,
iteration_count,
})
});
return Ok(completed_query);
}
let iteration_count = iteration_count.increment().unwrap_or_else(|| {
tracing::warn!("{me:?}: execute: too many cycle iterations");
panic!("{me:?}: execute: too many cycle iterations")
});
zalsa.event(&|| {
Event::new(EventKind::WillIterateCycle {
database_key: me,
iteration_count,
})
});
tracing::info!("{me:?}: execute: iterate again ({iteration_count:?})...",);
for head in cycle_heads.iter_not_eq(me) {
let ingredient = zalsa.lookup_ingredient(head.database_key_index.ingredient_index());
ingredient.set_cycle_iteration_count(
zalsa,
head.database_key_index.key_index(),
iteration_count,
);
}
debug_assert!(completed_query.revisions.cycle_heads().is_empty());
*completed_query.revisions.verified_final.get_mut() = false;
completed_query.revisions.set_cycle_heads(cycle_heads);
completed_query
.revisions
.update_iteration_count_mut(me, iteration_count);
Err((completed_query, iteration_count))
}
fn assert_no_new_cycle_heads(
cycle_heads: &CycleHeads,
new_cycle_heads: CycleHeads,
me: DatabaseKeyIndex,
) {
for head in new_cycle_heads {
if !cycle_heads.contains(&head.database_key_index) {
panic!(
"Cycle recovery function for {me:?} introduced a cycle, depending on {:?}. This is not allowed.",
head.database_key_index
);
}
}
}
thread_local! {
static FLATTEN_MAPS: std::cell::Cell<Option<(FxIndexSet<QueryEdge>, FxHashSet<DatabaseKeyIndex>)>> = const { std::cell::Cell::new(None) };
}
fn flatten_cycle_dependencies(zalsa: &Zalsa, head: &mut QueryRevisions) {
let (mut flattened, mut seen) = FLATTEN_MAPS.take().unwrap_or_default();
debug_assert!(flattened.is_empty());
debug_assert!(seen.is_empty());
#[cfg(feature = "accumulator")]
{
assert!(
head.accumulated_inputs.load().is_empty(),
"Fixpoint iteration doesn't support accumulated values because it doesn't preserve the original query dependency tree."
)
}
let edges = head.origin.as_ref().edges();
flattened.reserve(edges.len());
for edge in head.origin.as_ref().edges() {
match edge.kind() {
QueryEdgeKind::Input(input) => {
let ingredient = zalsa.lookup_ingredient(input.ingredient_index());
ingredient.flatten_cycle_head_dependencies(
zalsa,
input.key_index(),
&mut flattened,
&mut seen,
);
}
QueryEdgeKind::Output(_) => {
flattened.insert(*edge);
}
}
}
head.origin
.set_edges(flattened.drain(..).collect())
.expect("Executing query to always be derived or derived untracked.");
seen.clear();
FLATTEN_MAPS.set(Some((flattened, seen)));
}