use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::dependency_targets::{
classify_dependency_target, collect_active_change_ids, collect_archived_change_ids,
collect_rejected_change_ids, DependencyTargetClass,
};
use crate::error::{OrchestratorError, Result};
use crate::events::{ExecutionEvent, LogEntry};
use crate::orchestration::{
execute_rejection_flow, handle_blocked_from_rejecting, handle_resume_apply_from_rejecting,
run_rejection_review, RejectionReviewVerdict,
};
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinSet;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub(super) struct QueueReconciliationOutcome {
pub queued_added: usize,
pub repair_added: usize,
}
impl QueueReconciliationOutcome {
#[cfg(test)]
pub fn total_added(self) -> usize {
self.queued_added + self.repair_added
}
pub fn has_queued_additions(self) -> bool {
self.queued_added > 0
}
pub fn has_repair_additions(self) -> bool {
self.repair_added > 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QueueReconciliationDiagnosticLevel {
Info,
Warn,
}
use super::acceptance_state::delete_acceptance_state;
use super::cleanup::WorkspaceCleanupGuard;
use super::dispatch::archived_dirty_repair_candidate_from_workspace;
use super::dynamic_queue::ReanalysisReason;
use super::events::send_event;
use super::merge::base_dirty_reason;
use super::{MergeResult, MergeTaskOutcome, ParallelEvent, ParallelExecutor, WorkspaceResult};
fn on_merged_failure_message(change_id: &str, error: &OrchestratorError) -> String {
format!(
"on_merged hook failed for '{}'; merged transition blocked: {}",
change_id, error
)
}
impl ParallelExecutor {
pub async fn should_reanalyze(&self, bypass_debounce: bool) -> bool {
super::dynamic_queue::should_reanalyze_queue(&self.last_queue_change_at, bypass_debounce)
.await
}
pub(super) fn is_cancelled(&self) -> bool {
self.cancel_token
.as_ref()
.is_some_and(|token| token.is_cancelled())
}
pub(super) fn sync_resolve_wait_from_shared_state_nonblocking(&mut self) {
if let Some(shared) = &self.shared_orchestrator_state {
if let Ok(guard) = shared.try_read() {
self.resolve_wait_changes = guard.resolve_wait_change_ids().into_iter().collect();
self.reject_wait_changes = guard.reject_wait_change_ids().into_iter().collect();
}
}
}
pub(super) async fn clear_resolve_wait_intent_for_outcome(&mut self, change_id: &str) {
self.resolve_wait_changes.remove(change_id);
self.last_dispatched_resolve_wait_changes.remove(change_id);
if let Some(shared) = &self.shared_orchestrator_state {
let mut guard = shared.write().await;
guard.clear_resolve_wait_intent(change_id);
}
}
pub(super) async fn clear_reject_wait_intent_for_success(&mut self, change_id: &str) {
self.reject_wait_changes.remove(change_id);
self.last_dispatched_reject_wait_changes.remove(change_id);
if let Some(shared) = &self.shared_orchestrator_state {
let mut guard = shared.write().await;
guard.clear_reject_wait_intent(change_id);
}
}
async fn apply_rejection_review_event_in_shared_state(&mut self, event: &ExecutionEvent) {
if let Some(shared) = &self.shared_orchestrator_state {
let mut guard = shared.write().await;
guard.apply_execution_event(event);
}
}
pub(super) async fn mark_deferred_merge_completed_in_shared_state(
&mut self,
change_id: &str,
revision: &str,
) {
if let Some(shared) = &self.shared_orchestrator_state {
let mut guard = shared.write().await;
guard.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: change_id.to_string(),
revision: revision.to_string(),
});
}
}
async fn mark_on_merged_failure_in_shared_state(&mut self, change_id: &str, error: &str) {
if let Some(shared) = &self.shared_orchestrator_state {
let mut guard = shared.write().await;
guard.apply_execution_event(&ExecutionEvent::HookFailed {
change_id: change_id.to_string(),
hook_type: crate::hooks::HookType::OnMerged.to_string(),
error: error.to_string(),
});
}
}
pub(super) fn trigger_resolve_wait_retry_dispatch(&mut self) {
self.resolve_wait_retry_triggered = true;
}
pub(super) fn should_dispatch_resolve_wait_retry(&self) -> bool {
if self.resolve_wait_changes.is_empty() && self.reject_wait_changes.is_empty() {
return false;
}
self.resolve_wait_retry_triggered
|| self.last_dispatched_resolve_wait_changes != self.resolve_wait_changes
|| self.last_dispatched_reject_wait_changes != self.reject_wait_changes
}
pub(super) async fn maybe_dispatch_resolve_wait_retry(&mut self) {
let base_dirty_changed_to_clean = self
.resolve_wait_base_dirty_changed_to_clean()
.await
.unwrap_or(false);
if !self.should_dispatch_resolve_wait_retry() && !base_dirty_changed_to_clean {
return;
}
self.retry_deferred_base_lane_waiters().await;
self.last_dispatched_resolve_wait_changes = self.resolve_wait_changes.clone();
self.last_dispatched_reject_wait_changes = self.reject_wait_changes.clone();
self.resolve_wait_retry_triggered = false;
}
async fn resolve_wait_base_dirty_changed_to_clean(&mut self) -> Result<bool> {
if self.resolve_wait_changes.is_empty() && self.reject_wait_changes.is_empty() {
self.last_resolve_wait_base_dirty = None;
return Ok(false);
}
let base_dirty = base_dirty_reason(&self.repo_root).await?.is_some();
let changed_to_clean =
matches!(self.last_resolve_wait_base_dirty, Some(true)) && !base_dirty;
if changed_to_clean {
info!(
repo_root = %self.repo_root.display(),
resolve_wait_count = self.resolve_wait_changes.len(),
reject_wait_count = self.reject_wait_changes.len(),
"Base repository transitioned from dirty to clean while base-lane waiters exist; waking retry dispatch"
);
}
self.last_resolve_wait_base_dirty = Some(base_dirty);
Ok(changed_to_clean)
}
pub(crate) fn has_resolve_wait(&self) -> bool {
if let Some(shared) = &self.shared_orchestrator_state {
if let Ok(guard) = shared.try_read() {
return !guard.resolve_wait_change_ids().is_empty()
|| !guard.reject_wait_change_ids().is_empty();
}
}
!self.resolve_wait_changes.is_empty() || !self.reject_wait_changes.is_empty()
}
#[allow(dead_code)]
pub(super) fn skip_reason_for_change(&self, change_id: &str) -> Option<String> {
if let Some(failed_dep) = self.failed_tracker.should_skip(change_id) {
return Some(format!("Dependency '{}' failed", failed_dep));
}
None
}
fn active_dependency_ids(&self) -> HashSet<String> {
collect_active_change_ids(&self.repo_root)
}
fn archived_dependency_ids(&self) -> HashSet<String> {
collect_archived_change_ids(&self.repo_root)
}
fn rejected_dependency_ids(&self) -> HashSet<String> {
collect_rejected_change_ids(&self.repo_root)
}
fn dependency_blocker_fingerprint(
change_id: &str,
blockers: &[(String, DependencyTargetClass)],
) -> super::DependencyBlockerFingerprint {
let mut fingerprint = blockers
.iter()
.map(|(dep_id, class)| (dep_id.clone(), class.as_str().to_string()))
.collect::<Vec<_>>();
fingerprint.sort();
fingerprint.insert(0, ("change_id".to_string(), change_id.to_string()));
fingerprint
}
fn should_emit_dependency_blocked_transition(
&mut self,
change_id: &str,
blockers: &[(String, DependencyTargetClass)],
) -> bool {
let fingerprint = Self::dependency_blocker_fingerprint(change_id, blockers);
let changed = self
.dependency_blocker_fingerprints
.get(change_id)
.is_none_or(|previous| previous != &fingerprint);
if changed {
self.dependency_blocker_fingerprints
.insert(change_id.to_string(), fingerprint.clone());
self.dependency_blocker_diagnostics_seen
.insert((change_id.to_string(), fingerprint));
return true;
}
false
}
async fn emit_dependency_blocker_diagnostic(
&self,
change_id: &str,
blockers: &[(String, DependencyTargetClass)],
) {
for (dep_id, class) in blockers {
let message = format!(
"Change '{}' blocked by {} dependency '{}' and will remain queued",
change_id,
class.as_str(),
dep_id
);
match class {
DependencyTargetClass::Missing
| DependencyTargetClass::Rejected
| DependencyTargetClass::Error => warn!("{}", message),
DependencyTargetClass::Queued
| DependencyTargetClass::InFlight
| DependencyTargetClass::ActiveButNotQueued => info!("{}", message),
DependencyTargetClass::Archived => debug!("{}", message),
}
if matches!(
class,
DependencyTargetClass::Missing
| DependencyTargetClass::Rejected
| DependencyTargetClass::Error
) {
send_event(&self.event_tx, ParallelEvent::Error { message }).await;
}
}
}
pub(super) async fn is_dependency_resolved(&self, dep_id: &str) -> Result<bool> {
let original_branch = self
.workspace_manager
.ensure_original_branch_initialized()
.await
.map_err(OrchestratorError::from_vcs_error)?;
match crate::execution::state::is_merged_to_base(dep_id, &self.repo_root, &original_branch)
.await
{
Ok(is_merged) => Ok(is_merged),
Err(e) => {
warn!(
"Failed to check if dependency '{}' is merged to base: {}, assuming not resolved",
dep_id, e
);
Ok(false)
}
}
}
pub(super) fn manual_resolve_active(&self) -> usize {
self.manual_resolve_count
.as_ref()
.map(|counter| counter.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(0)
}
pub(super) fn calculate_available_slots(
&self,
max_parallelism: usize,
in_flight: &HashSet<String>,
) -> usize {
let manual_resolve_count = self.manual_resolve_active();
let auto_resolve_count = self
.auto_resolve_count
.load(std::sync::atomic::Ordering::Relaxed);
max_parallelism
.saturating_sub(in_flight.len())
.saturating_sub(manual_resolve_count)
.saturating_sub(auto_resolve_count)
}
pub(super) fn filter_executable_changes(
&self,
queued: &[crate::openspec::Change],
) -> (Vec<crate::openspec::Change>, Vec<(String, String)>) {
let mut executable_changes: Vec<crate::openspec::Change> = Vec::new();
let mut skipped_changes: Vec<(String, String)> = Vec::new();
for change in queued {
if let Some(failed_dep) = self.failed_tracker.should_skip(&change.id) {
let reason = format!("Dependency '{}' failed", failed_dep);
warn!(
"Skipping change-{} because dependency change-{} failed",
change.id, failed_dep
);
skipped_changes.push((change.id.clone(), reason));
} else {
executable_changes.push(change.clone());
}
}
(executable_changes, skipped_changes)
}
pub(super) async fn select_changes_for_dispatch(
&mut self,
analysis_result: &crate::analyzer::AnalysisResult,
available_slots: usize,
in_flight: &HashSet<String>,
) -> Vec<String> {
let mut selected_changes: Vec<String> = Vec::new();
let queued_ids: HashSet<&str> = analysis_result.order.iter().map(String::as_str).collect();
let in_flight_ids: HashSet<&str> = in_flight.iter().map(String::as_str).collect();
let active_ids = self.active_dependency_ids();
let active_refs: HashSet<&str> = active_ids.iter().map(String::as_str).collect();
let archived_ids = self.archived_dependency_ids();
let rejected_ids = self.rejected_dependency_ids();
for change_id in &analysis_result.order {
if let Some(shared) = &self.shared_orchestrator_state {
if let Ok(guard) = shared.try_read() {
if guard.is_terminal_error_change(change_id) {
info!(
change_id = %change_id,
"Skipping ordinary apply dispatch because terminal error requires explicit retry"
);
continue;
}
}
}
if let Some(deps) = analysis_result.dependencies.get(change_id) {
let mut unresolved_deps = Vec::new();
let mut blockers = Vec::new();
for dep_id in deps {
let class = classify_dependency_target(
dep_id,
queued_ids.iter().copied(),
in_flight_ids.iter().copied(),
active_refs.iter().copied(),
&archived_ids,
&rejected_ids,
);
match class {
DependencyTargetClass::Archived => {
info!(
"Change '{}' dependency '{}' is archived and already satisfied",
change_id, dep_id
);
continue;
}
DependencyTargetClass::Missing | DependencyTargetClass::Rejected => {
unresolved_deps.push(dep_id.clone());
blockers.push((dep_id.clone(), class));
continue;
}
DependencyTargetClass::Queued
| DependencyTargetClass::InFlight
| DependencyTargetClass::ActiveButNotQueued => {}
DependencyTargetClass::Error => unreachable!(
"error dependency class is derived from reducer state, not repository classification"
),
}
if let Some(shared) = &self.shared_orchestrator_state {
if let Ok(guard) = shared.try_read() {
if guard.is_terminal_error_change(dep_id) {
warn!(
change_id = %change_id,
dependency = %dep_id,
"Blocking dispatch because dependency is in terminal error and requires explicit retry"
);
unresolved_deps.push(dep_id.clone());
blockers.push((dep_id.clone(), DependencyTargetClass::Error));
continue;
}
}
}
match self.is_dependency_resolved(dep_id).await {
Ok(true) => {}
Ok(false) => {
unresolved_deps.push(dep_id.clone());
blockers.push((dep_id.clone(), class));
}
Err(e) => {
error!(
"Failed to evaluate dependency resolution for '{}' (dependency '{}'): {}",
change_id, dep_id, e
);
send_event(
&self.event_tx,
ParallelEvent::Error {
message: format!(
"Failed to evaluate dependency resolution for '{}' (dependency '{}'): {}",
change_id, dep_id, e
),
},
)
.await;
unresolved_deps.push(dep_id.clone());
blockers.push((dep_id.clone(), class));
}
}
}
if !unresolved_deps.is_empty() {
if self.should_emit_dependency_blocked_transition(change_id, &blockers) {
info!(
"Change '{}' blocked: waiting for dependencies {:?}",
change_id, unresolved_deps
);
self.emit_dependency_blocker_diagnostic(change_id, &blockers)
.await;
send_event(
&self.event_tx,
ParallelEvent::DependencyBlocked {
change_id: change_id.clone(),
dependency_ids: unresolved_deps,
},
)
.await;
} else {
debug!(
change_id,
blockers = ?blockers,
"Suppressing repeated dependency blocked transition"
);
}
continue;
}
}
if self
.dependency_blocker_fingerprints
.remove(change_id)
.is_some()
{
info!(
"Change '{}' dependencies resolved, forcing fresh workspace recreation",
change_id
);
self.force_recreate_worktree.insert(change_id.clone());
send_event(
&self.event_tx,
ParallelEvent::DependencyResolved {
change_id: change_id.clone(),
},
)
.await;
}
if selected_changes.len() < available_slots {
selected_changes.push(change_id.clone());
}
}
selected_changes
}
pub(super) async fn handle_workspace_completion(
&mut self,
workspace_result: WorkspaceResult,
max_parallelism: usize,
in_flight: &mut HashSet<String>,
merge_result_tx: &mpsc::Sender<MergeResult>,
) {
in_flight.remove(&workspace_result.change_id);
if let Some(ref queue) = self.dynamic_queue {
queue
.unregister_kill_token(&workspace_result.change_id)
.await;
}
info!(
"Task completed: change='{}', in_flight={}, available_slots={}, error={:?}",
workspace_result.change_id,
in_flight.len(),
max_parallelism.saturating_sub(in_flight.len()),
workspace_result.error
);
if let Some(error) = &workspace_result.error {
error!("Change '{}' failed: {}", workspace_result.change_id, error);
self.failed_tracker.mark_failed(&workspace_result.change_id);
send_event(
&self.event_tx,
ParallelEvent::ProcessingError {
id: workspace_result.change_id.clone(),
error: error.clone(),
},
)
.await;
self.retry_deferred_base_lane_waiters().await;
} else if let Some(reason) = &workspace_result.rejected {
info!(
"Change '{}' rejected after acceptance blocker: {}",
workspace_result.change_id, reason
);
send_event(
&self.event_tx,
ParallelEvent::ChangeRejected {
change_id: workspace_result.change_id.clone(),
reason: reason.clone(),
},
)
.await;
let workspace_path = self
.workspace_manager
.find_existing_workspace(&workspace_result.change_id)
.await
.ok()
.flatten()
.map(|info| info.path);
if let Err(e) = self
.workspace_manager
.cleanup_workspace(&workspace_result.workspace_name)
.await
{
error!(
"Failed to cleanup rejected workspace '{}' for change '{}': {}",
workspace_result.workspace_name, workspace_result.change_id, e
);
} else if let Some(workspace_path) = workspace_path {
if let Err(err) = delete_acceptance_state(&workspace_path) {
warn!(
"Failed to delete acceptance state for rejected change '{}': {}",
workspace_result.change_id, err
);
}
}
self.retry_deferred_base_lane_waiters().await;
} else {
info!(
"Change '{}' completed successfully",
workspace_result.change_id
);
if workspace_result.final_revision.is_some() {
self.spawn_merge_task(workspace_result, merge_result_tx.clone());
}
}
}
fn spawn_merge_task(
&self,
workspace_result: WorkspaceResult,
merge_result_tx: mpsc::Sender<MergeResult>,
) {
let mut merge_executor = ParallelExecutor::new(
self.repo_root.clone(),
self.config.clone(),
self.event_tx.clone(),
);
merge_executor.max_conflict_retries = self.max_conflict_retries;
merge_executor.shared_stagger_state = self.shared_stagger_state.clone();
merge_executor.auto_resolve_count = self.auto_resolve_count.clone();
merge_executor.pending_merge_count = self.pending_merge_count.clone();
merge_executor.cancel_token = self.cancel_token.clone();
merge_executor.manual_resolve_count = self.manual_resolve_count.clone();
merge_executor.hooks = self.hooks.clone();
self.pending_merge_count.fetch_add(1, Ordering::Relaxed);
tokio::spawn(async move {
let change_id = workspace_result.change_id.clone();
let workspace_name = workspace_result.workspace_name.clone();
let Some(_active_merge_guard) =
super::merge::ActivePostArchiveMergeGuard::acquire(change_id.clone())
else {
info!(
change_id = %change_id,
workspace = %workspace_name,
"Skipping duplicate post-archive merge task because the same change is already active"
);
if let Err(send_error) = merge_result_tx
.send(MergeResult {
change_id,
workspace_name,
outcome: Ok(MergeTaskOutcome::deferred(
"duplicate post-archive merge task suppressed because the same change is already active",
true,
)),
})
.await
{
warn!(
"Failed to send duplicate merge suppression result to scheduler loop: {}",
send_error
);
}
return;
};
let outcome = merge_executor
.handle_merge_and_cleanup(workspace_result)
.await
.map_err(|error| error.to_string());
if let Err(send_error) = merge_result_tx
.send(MergeResult {
change_id,
workspace_name,
outcome,
})
.await
{
warn!(
"Failed to send merge result to scheduler loop: {}",
send_error
);
}
});
}
pub(super) async fn handle_merge_result(&mut self, merge_result: MergeResult) -> bool {
self.pending_merge_count.fetch_sub(1, Ordering::Relaxed);
match merge_result.outcome {
Ok(MergeTaskOutcome::Merged) => {
info!(
"Background merge task completed successfully for '{}'",
merge_result.change_id
);
self.retry_deferred_base_lane_waiters().await;
true
}
Ok(MergeTaskOutcome::Deferred {
reason,
auto_resumable,
}) => {
info!(
"Background merge task deferred for '{}' (workspace '{}', auto_resumable={}): {}",
merge_result.change_id, merge_result.workspace_name, auto_resumable, reason
);
false
}
Err(error) => {
error!(
"Background merge task failed for '{}' (workspace '{}'): {}",
merge_result.change_id, merge_result.workspace_name, error
);
send_event(
&self.event_tx,
ParallelEvent::Error {
message: format!(
"Background merge failed for '{}' (workspace '{}'): {}",
merge_result.change_id, merge_result.workspace_name, error
),
},
)
.await;
false
}
}
}
pub(super) async fn retry_deferred_base_lane_waiters(&mut self) {
let Some(shared) = &self.shared_orchestrator_state else {
self.retry_deferred_merges().await;
return;
};
let promoted = {
let mut guard = shared.write().await;
guard.promote_next_base_mutating_lane_waiter()
};
match promoted {
Some((change_id, crate::orchestration::state::WaitState::ResolveWait)) => {
self.resolve_wait_changes.insert(change_id.clone());
self.retry_deferred_merges_for(vec![change_id]).await;
}
Some((change_id, crate::orchestration::state::WaitState::RejectWait)) => {
self.reject_wait_changes.insert(change_id.clone());
self.retry_deferred_rejection_review_for(change_id).await;
}
Some((change_id, wait_state)) => {
warn!(
"Ignoring unsupported base-mutating lane promotion for '{}' with wait state {:?}",
change_id, wait_state
);
}
None => {}
}
if let Some(shared) = &self.shared_orchestrator_state {
let guard = shared.read().await;
self.resolve_wait_changes = guard.resolve_wait_change_ids().into_iter().collect();
self.reject_wait_changes = guard.reject_wait_change_ids().into_iter().collect();
}
}
pub(super) async fn retry_deferred_merges(&mut self) {
if let Some(shared) = &self.shared_orchestrator_state {
let guard = shared.read().await;
self.resolve_wait_changes = guard.resolve_wait_change_ids().into_iter().collect();
}
if self.resolve_wait_changes.is_empty() {
return;
}
let deferred: Vec<String> = self.resolve_wait_changes.iter().cloned().collect();
self.retry_deferred_merges_for(deferred).await;
}
pub(super) fn stale_retry_reason(workspace_info: &crate::vcs::WorkspaceInfo) -> Option<String> {
if workspace_info.path.exists() {
return None;
}
Some(format!(
"workspace path '{}' no longer exists",
workspace_info.path.display()
))
}
async fn retry_deferred_merges_for(&mut self, deferred: Vec<String>) {
for change_id in deferred.into_iter().take(1) {
if self.is_change_already_merged_to_base(&change_id).await {
info!(
"Skipping stale deferred merge retry for '{}' because it is already merged to base",
change_id
);
self.clear_resolve_wait_intent_for_outcome(&change_id).await;
continue;
}
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::info(format!(
"ResolveWait retry dispatch started for '{}'",
change_id
))),
)
.await;
let workspace_info = match self
.workspace_manager
.find_existing_workspace(&change_id)
.await
{
Ok(Some(ws)) => ws,
Ok(None) => {
warn!(
"No workspace found for deferred change '{}', clearing stale retry intent",
change_id
);
self.clear_resolve_wait_intent_for_outcome(&change_id).await;
continue;
}
Err(e) => {
warn!(
"Failed to find workspace for deferred change '{}': {}",
change_id, e
);
continue;
}
};
if let Some(stale_reason) = Self::stale_retry_reason(&workspace_info) {
warn!(
change_id = %change_id,
workspace = %workspace_info.workspace_name,
workspace_path = %workspace_info.path.display(),
stale_reason = %stale_reason,
"Deferred merge retry workspace path is stale; clearing retry intent"
);
self.clear_resolve_wait_intent_for_outcome(&change_id).await;
continue;
}
info!(
"Retrying deferred merge for '{}' (workspace: {})",
change_id, workspace_info.workspace_name
);
let revisions = vec![workspace_info.workspace_name.clone()];
let change_ids = vec![change_id.clone()];
let archive_paths = vec![workspace_info.path.clone()];
match self
.attempt_merge(&revisions, &change_ids, &archive_paths)
.await
{
Ok(super::merge::MergeAttempt::Merged { revision }) => {
info!("Deferred merge succeeded for '{}' on retry", change_id);
if let Some(ref hooks) = self.hooks {
let (completed_tasks, total_tasks) =
match crate::openspec::list_changes_native() {
Ok(changes) => changes
.iter()
.find(|c| c.id == change_id)
.map(|c| (c.completed_tasks, c.total_tasks))
.unwrap_or((0, 0)),
Err(e) => {
warn!("Failed to fetch task counts for on_merged hook: {}", e);
(0, 0)
}
};
let ws_path = workspace_info.path.to_string_lossy().to_string();
let hook_ctx = crate::hooks::HookContext::new(0, 0, 0, false)
.with_change(&change_id, completed_tasks, total_tasks)
.with_apply_count(0)
.with_parallel_context(&ws_path, None);
if let Err(e) = hooks
.run_hook(crate::hooks::HookType::OnMerged, &hook_ctx)
.await
{
let message = on_merged_failure_message(&change_id, &e);
error!("{}", message);
self.clear_resolve_wait_intent_for_outcome(&change_id).await;
self.mark_on_merged_failure_in_shared_state(&change_id, &message)
.await;
send_event(
&self.event_tx,
ParallelEvent::HookFailed {
change_id: change_id.clone(),
hook_type: crate::hooks::HookType::OnMerged.to_string(),
error: e.to_string(),
},
)
.await;
send_event(
&self.event_tx,
ParallelEvent::ResolveFailed {
change_id: change_id.clone(),
error: message,
},
)
.await;
continue;
}
}
self.clear_resolve_wait_intent_for_outcome(&change_id).await;
self.mark_deferred_merge_completed_in_shared_state(&change_id, &revision)
.await;
send_event(
&self.event_tx,
ParallelEvent::MergeCompleted {
change_id: change_id.clone(),
revision: revision.clone(),
},
)
.await;
send_event(
&self.event_tx,
ParallelEvent::CleanupStarted {
workspace: workspace_info.workspace_name.clone(),
},
)
.await;
if let Err(e) = self
.workspace_manager
.cleanup_workspace(&workspace_info.workspace_name)
.await
{
warn!(
"Failed to cleanup workspace '{}' after deferred merge retry: {}",
workspace_info.workspace_name, e
);
} else {
if let Err(err) = delete_acceptance_state(&workspace_info.path) {
warn!(
"Failed to delete acceptance state for '{}' after deferred merge cleanup: {}",
change_id,
err
);
}
send_event(
&self.event_tx,
ParallelEvent::CleanupCompleted {
workspace: workspace_info.workspace_name.clone(),
},
)
.await;
}
}
Ok(super::merge::MergeAttempt::Deferred(deferred)) => {
info!(
"Deferred merge still blocked for '{}': {} (auto_resumable={})",
change_id, deferred.reason, deferred.auto_resumable
);
if deferred.auto_resumable {
self.merge_wait_changes.remove(&change_id);
} else {
self.resolve_wait_changes.remove(&change_id);
self.merge_wait_changes.insert(change_id.clone());
}
send_event(
&self.event_tx,
ParallelEvent::MergeDeferred {
change_id: change_id.clone(),
reason: deferred.reason,
auto_resumable: deferred.auto_resumable,
},
)
.await;
}
Err(e) => {
error!("Deferred merge retry error for '{}': {}", change_id, e);
}
}
}
}
async fn retry_deferred_rejection_review_for(&mut self, change_id: String) {
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::info(format!(
"RejectWait retry dispatch started for '{}'",
change_id
))),
)
.await;
let workspace_info = match self
.workspace_manager
.find_existing_workspace(&change_id)
.await
{
Ok(Some(ws)) => ws,
Ok(None) => {
warn!(
"No workspace found for deferred rejection review '{}', clearing reject wait",
change_id
);
self.clear_reject_wait_intent_for_success(&change_id).await;
return;
}
Err(e) => {
warn!(
"Failed to find workspace for deferred rejection review '{}': {}",
change_id, e
);
return;
}
};
send_event(
&self.event_tx,
ParallelEvent::WorkspaceStatusUpdated {
change_id: change_id.clone(),
workspace_name: workspace_info.workspace_name.clone(),
status: crate::vcs::WorkspaceStatus::Rejecting,
},
)
.await;
self.clear_reject_wait_intent_for_success(&change_id).await;
match run_rejection_review(
&change_id,
&workspace_info.path,
&self.config,
&self.ai_runner,
)
.await
{
Ok(RejectionReviewVerdict::Confirm) => {
let rejected_path = workspace_info
.path
.join("openspec")
.join("changes")
.join(&change_id)
.join("REJECTED.md");
let reason = format!(
"Rejecting review confirmed rejection (proposal: {})",
rejected_path.display()
);
let base_branch = self
.workspace_manager
.ensure_original_branch_initialized()
.await
.unwrap_or_else(|error| {
warn!(
"Failed to resolve base branch while confirming deferred rejection review for '{}': {}",
change_id, error
);
"main".to_string()
});
match execute_rejection_flow(
&change_id,
&reason,
&workspace_info.path,
&base_branch,
&self.repo_root,
)
.await
{
Ok(()) => {
let completed_event = ParallelEvent::RejectionReviewCompleted {
change_id: change_id.clone(),
outcome: crate::events::RejectionOutcome::Confirm,
};
self.apply_rejection_review_event_in_shared_state(&completed_event)
.await;
send_event(&self.event_tx, completed_event).await;
send_event(
&self.event_tx,
ParallelEvent::ChangeRejected {
change_id: change_id.clone(),
reason,
},
)
.await;
send_event(
&self.event_tx,
ParallelEvent::ChangeDequeued {
change_id: change_id.clone(),
},
)
.await;
}
Err(error) => {
error!(
"Deferred rejection review confirm flow failed for '{}': {}",
change_id, error
);
let failed_event = ParallelEvent::RejectionReviewFailed {
change_id: change_id.clone(),
error: format!(
"Rejected flow failed after deferred rejecting CONFIRM verdict: {}",
error
),
};
self.apply_rejection_review_event_in_shared_state(&failed_event)
.await;
send_event(&self.event_tx, failed_event).await;
}
}
}
Ok(RejectionReviewVerdict::Resume) => {
match handle_resume_apply_from_rejecting(&change_id, &workspace_info.path).await {
Ok(()) => {
let completed_event = ParallelEvent::RejectionReviewCompleted {
change_id: change_id.clone(),
outcome: crate::events::RejectionOutcome::Resume,
};
self.apply_rejection_review_event_in_shared_state(&completed_event)
.await;
send_event(&self.event_tx, completed_event).await;
send_event(
&self.event_tx,
ParallelEvent::Log(
LogEntry::warn(
"Deferred rejecting review returned RESUME; workspace is ready for apply resume",
)
.with_change_id(&change_id)
.with_operation("rejecting"),
),
)
.await;
}
Err(error) => {
let failed_event = ParallelEvent::RejectionReviewFailed {
change_id: change_id.clone(),
error: error.to_string(),
};
self.apply_rejection_review_event_in_shared_state(&failed_event)
.await;
send_event(&self.event_tx, failed_event).await;
}
}
}
Ok(RejectionReviewVerdict::Block) => {
match handle_blocked_from_rejecting(&change_id, &workspace_info.path).await {
Ok(()) => {
let completed_event = ParallelEvent::RejectionReviewCompleted {
change_id: change_id.clone(),
outcome: crate::events::RejectionOutcome::Block,
};
self.apply_rejection_review_event_in_shared_state(&completed_event)
.await;
send_event(&self.event_tx, completed_event).await;
send_event(
&self.event_tx,
ParallelEvent::WorkspaceStatusUpdated {
change_id: change_id.clone(),
workspace_name: workspace_info.workspace_name.clone(),
status: crate::vcs::WorkspaceStatus::Blocked,
},
)
.await;
}
Err(error) => {
let failed_event = ParallelEvent::RejectionReviewFailed {
change_id: change_id.clone(),
error: error.to_string(),
};
self.apply_rejection_review_event_in_shared_state(&failed_event)
.await;
send_event(&self.event_tx, failed_event).await;
}
}
}
Err(error) => {
error!(
"Deferred rejection review failed for '{}': {}",
change_id, error
);
let failed_event = ParallelEvent::RejectionReviewFailed {
change_id: change_id.clone(),
error: format!("Rejecting review failed after deferred handoff: {}", error),
};
self.apply_rejection_review_event_in_shared_state(&failed_event)
.await;
send_event(&self.event_tx, failed_event).await;
}
}
}
#[allow(dead_code)]
pub(super) async fn retry_deferred_rejection_reviews(&mut self) {
let Some(shared) = &self.shared_orchestrator_state else {
return;
};
let (lane_occupied, reject_wait_ids) = {
let guard = shared.read().await;
(
guard.is_base_mutating_lane_occupied(),
guard.reject_wait_change_ids(),
)
};
self.reject_wait_changes = reject_wait_ids.into_iter().collect();
if lane_occupied || self.reject_wait_changes.is_empty() {
return;
}
let Some(change_id) = self.reject_wait_changes.iter().min().cloned() else {
return;
};
self.retry_deferred_rejection_review_for(change_id).await;
}
pub(super) async fn check_dynamic_queue_and_add_changes(
&mut self,
queued: &mut Vec<crate::openspec::Change>,
in_flight: &HashSet<String>,
reanalysis_reason: &mut ReanalysisReason,
) -> bool {
if let Some(queue) = &self.dynamic_queue {
let mut queue_changed = false;
while let Some(dynamic_id) = queue.pop().await {
if !queued.iter().any(|c| c.id == dynamic_id) && !in_flight.contains(&dynamic_id) {
match crate::openspec::list_changes_native() {
Ok(all_changes) => {
if let Some(new_change) =
all_changes.into_iter().find(|c| c.id == dynamic_id)
{
info!("Dynamically adding change to execution: {}", dynamic_id);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::info(format!(
"Dynamically added to parallel execution: {}",
dynamic_id
))),
)
.await;
queued.push(new_change);
queue_changed = true;
} else {
warn!(
"Dynamically added change '{}' not found in openspec",
dynamic_id
);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::warn(format!(
"Queue reconciliation pending for '{}': candidate_not_found",
dynamic_id
))),
)
.await;
}
}
Err(e) => {
warn!(
"Failed to load dynamically added change '{}': {}",
dynamic_id, e
);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::warn(format!(
"Queue reconciliation pending for '{}': candidate_load_failed ({})",
dynamic_id, e
))),
)
.await;
}
}
} else if in_flight.contains(&dynamic_id) {
debug!(
"Ignoring dynamic queue entry '{}' because it is already in-flight",
dynamic_id
);
} else {
debug!(
"Ignoring dynamic queue entry '{}' because it is already queued",
dynamic_id
);
}
}
if queue_changed {
let mut last_change = self.last_queue_change_at.lock().await;
*last_change = Some(std::time::Instant::now());
*reanalysis_reason = ReanalysisReason::QueueNotification;
info!("Queue changed, scheduler state now requires re-analysis");
}
queue_changed
} else {
false
}
}
pub(super) fn should_emit_queue_reconciliation_diagnostic(
&mut self,
change_id: &str,
reason: &str,
) -> bool {
self.queue_reconciliation_diagnostics_seen
.insert((change_id.to_string(), reason.to_string()))
}
async fn emit_queue_reconciliation_diagnostic(
&mut self,
level: QueueReconciliationDiagnosticLevel,
change_id: &str,
reason: &str,
) {
if !self.should_emit_queue_reconciliation_diagnostic(change_id, reason) {
debug!(
change_id,
reason, "Suppressing repeated queue reconciliation diagnostic"
);
return;
}
self.emit_queue_reconciliation_diagnostic_without_dedupe(level, change_id, reason)
.await;
}
async fn emit_queue_reconciliation_diagnostic_without_dedupe(
&self,
level: QueueReconciliationDiagnosticLevel,
change_id: &str,
reason: &str,
) {
let message = match level {
QueueReconciliationDiagnosticLevel::Info => LogEntry::info(format!(
"Queue reconciliation deferred for '{}': {}",
change_id, reason
)),
QueueReconciliationDiagnosticLevel::Warn => LogEntry::warn(format!(
"Queue reconciliation pending for '{}': {}",
change_id, reason
)),
};
send_event(&self.event_tx, ParallelEvent::Log(message)).await;
}
pub(super) async fn reconcile_queued_candidates_from_shared_state(
&mut self,
queued: &mut Vec<crate::openspec::Change>,
in_flight: &HashSet<String>,
) -> QueueReconciliationOutcome {
let Some(shared_state) = &self.shared_orchestrator_state else {
return QueueReconciliationOutcome::default();
};
let (mut queued_intent_ids, active_ids_from_reducer, terminal_error_ids, merge_wait_ids) =
match shared_state.try_read() {
Ok(state) => {
let terminal_error_ids = state
.initial_change_ids()
.iter()
.filter(|id| state.is_terminal_error_change(id))
.cloned()
.collect::<std::collections::HashSet<_>>();
(
state.queued_change_ids(),
state.active_change_ids(),
terminal_error_ids,
state.merge_wait_change_ids(),
)
}
Err(_) => return QueueReconciliationOutcome::default(),
};
let reducer_active_set: std::collections::HashSet<String> =
active_ids_from_reducer.into_iter().collect();
let reducer_merge_wait_set: std::collections::HashSet<String> =
merge_wait_ids.into_iter().collect();
let base_branch_for_archived_dirty_scan = match self
.workspace_manager
.ensure_original_branch_initialized()
.await
{
Ok(branch) => Some(branch),
Err(error) => {
warn!(
"Failed to determine base branch during archived dirty queue reconciliation: {}",
error
);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::warn(format!(
"Queue reconciliation skipped archived-dirty worktree scan: failed_to_determine_base_branch ({})",
error
))),
)
.await;
None
}
};
match self.workspace_manager.list_worktree_change_ids().await {
Ok(worktree_change_ids) => {
for worktree_change_id in worktree_change_ids {
if terminal_error_ids.contains(&worktree_change_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&worktree_change_id,
"terminal_error_retry_required",
)
.await;
continue;
}
if reducer_merge_wait_set.contains(&worktree_change_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&worktree_change_id,
"manual_merge_wait",
)
.await;
continue;
}
if queued_intent_ids.iter().any(|id| id == &worktree_change_id)
|| in_flight.contains(&worktree_change_id)
|| reducer_active_set.contains(&worktree_change_id)
|| Self::is_post_archive_merge_active_for(&worktree_change_id)
{
continue;
}
let archived_dirty = if let Some(base_branch) =
&base_branch_for_archived_dirty_scan
{
match self
.workspace_manager
.find_existing_workspace(&worktree_change_id)
.await
{
Ok(Some(workspace)) => archived_dirty_repair_candidate_from_workspace(
&worktree_change_id,
&workspace.path,
base_branch,
)
.await
.is_some(),
Ok(None) => false,
Err(error) => {
warn!(
change_id = %worktree_change_id,
"Failed to find workspace during archived dirty queue reconciliation: {}",
error
);
false
}
}
} else {
false
};
if archived_dirty {
info!(
change_id = %worktree_change_id,
"Queue reconciliation discovered archived dirty workspace without reducer queued intent"
);
queued_intent_ids.push(worktree_change_id);
}
}
}
Err(e) => {
warn!(
"Failed to list worktree change ids during archived dirty queue reconciliation: {}",
e
);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::warn(format!(
"Queue reconciliation skipped archived-dirty worktree scan: failed_to_list_worktrees ({})",
e
))),
)
.await;
}
}
if queued_intent_ids.is_empty() {
return QueueReconciliationOutcome::default();
}
let mut known_changes = match crate::openspec::list_changes_native_from(&self.repo_root) {
Ok(changes) => changes,
Err(e) => {
warn!(
"Failed to load OpenSpec changes during queue reconciliation: {}",
e
);
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::warn(format!(
"Queue reconciliation skipped: failed_to_load_changes ({})",
e
))),
)
.await;
return QueueReconciliationOutcome::default();
}
};
let mut known_by_id: std::collections::HashMap<String, crate::openspec::Change> =
known_changes
.drain(..)
.map(|change| (change.id.clone(), change))
.collect();
let mut outcome = QueueReconciliationOutcome::default();
for queued_id in queued_intent_ids {
if terminal_error_ids.contains(&queued_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&queued_id,
"terminal_error_retry_required",
)
.await;
continue;
}
if reducer_merge_wait_set.contains(&queued_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&queued_id,
"manual_merge_wait",
)
.await;
continue;
}
if queued.iter().any(|change| change.id == queued_id) {
continue;
}
if in_flight.contains(&queued_id) || reducer_active_set.contains(&queued_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&queued_id,
"already_active",
)
.await;
continue;
}
if Self::is_post_archive_merge_active_for(&queued_id) {
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&queued_id,
"post_archive_merge_active",
)
.await;
continue;
}
match known_by_id.remove(&queued_id) {
Some(change) => {
info!(
"Queue reconciliation adding reducer-queued change candidate: {}",
queued_id
);
queued.push(change);
outcome.queued_added += 1;
}
None => {
let archived_dirty_candidate = if let Some(base_branch) =
&base_branch_for_archived_dirty_scan
{
match self
.workspace_manager
.find_existing_workspace(&queued_id)
.await
{
Ok(Some(workspace)) => {
archived_dirty_repair_candidate_from_workspace(
&queued_id,
&workspace.path,
base_branch,
)
.await
}
Ok(None) => None,
Err(error) => {
warn!(
change_id = %queued_id,
"Failed to find workspace for reducer-queued archived dirty repair candidate: {}",
error
);
None
}
}
} else {
None
};
if let Some(change) = archived_dirty_candidate {
info!(
"Queue reconciliation adding archived dirty repair candidate: {}",
queued_id
);
self.emit_queue_reconciliation_diagnostic(
QueueReconciliationDiagnosticLevel::Info,
&queued_id,
"archived_dirty_repair_candidate",
)
.await;
queued.push(change);
outcome.repair_added += 1;
} else if self.should_emit_queue_reconciliation_diagnostic(
&queued_id,
"candidate_not_found",
) {
warn!(
"Queue reconciliation could not load reducer-queued change '{}': candidate_not_found",
queued_id
);
self.emit_queue_reconciliation_diagnostic_without_dedupe(
QueueReconciliationDiagnosticLevel::Warn,
&queued_id,
"candidate_not_found",
)
.await;
} else {
debug!(
change_id = %queued_id,
reason = "candidate_not_found",
"Suppressing repeated queue reconciliation candidate_not_found warning"
);
}
}
}
}
if outcome.queued_added > 0 {
let mut last_change = self.last_queue_change_at.lock().await;
*last_change = Some(std::time::Instant::now());
}
outcome
}
pub(super) async fn emit_no_analysis_diagnostic(
&mut self,
queued: &[crate::openspec::Change],
in_flight: &HashSet<String>,
max_parallelism: usize,
reason: &str,
) {
let reducer_queued = self
.shared_orchestrator_state
.as_ref()
.and_then(|state| state.try_read().ok())
.map(|state| state.queued_change_ids())
.unwrap_or_default();
if reducer_queued.is_empty() {
return;
}
let diagnostic_key = (
reducer_queued.clone(),
queued.len(),
in_flight.len(),
reason.to_string(),
);
if !self.no_analysis_diagnostics_seen.insert(diagnostic_key) {
debug!(reason, "Suppressing repeated no-analysis diagnostic");
return;
}
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::info(format!(
"No analysis started despite reducer-visible queued work: reason={}, reducer_queued={:?}, local_queued={}, in_flight={}, max_parallelism={}",
reason,
reducer_queued,
queued.len(),
in_flight.len(),
max_parallelism
))),
)
.await;
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn perform_reanalysis_and_dispatch<F>(
&mut self,
queued: &mut Vec<crate::openspec::Change>,
in_flight: &mut HashSet<String>,
max_parallelism: usize,
iteration: u32,
reanalysis_reason: ReanalysisReason,
analyzer: &F,
semaphore: Arc<Semaphore>,
join_set: &mut JoinSet<WorkspaceResult>,
cleanup_guard: &mut WorkspaceCleanupGuard,
) -> Result<(bool, u32)>
where
for<'a> F: Fn(
&'a [crate::openspec::Change],
&'a [String],
u32,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = crate::analyzer::AnalysisResult> + Send + 'a>,
> + Send
+ Sync,
{
let available_slots = self.calculate_available_slots(max_parallelism, in_flight);
let previous_available_slots = self.last_available_slots.replace(available_slots);
let slot_recovered = matches!(previous_available_slots, Some(0)) && available_slots > 0;
if slot_recovered && matches!(reanalysis_reason, ReanalysisReason::QueueNotification) {
info!(
previous_available_slots = previous_available_slots.unwrap_or_default(),
available_slots,
queued = queued.len(),
"Execution capacity recovered; promoting queue re-analysis trigger"
);
}
if available_slots == 0 {
info!(
"Re-analysis deferred: no available slots (max: {}, in_flight: {}, queued: {})",
max_parallelism,
in_flight.len(),
queued.len()
);
self.emit_no_analysis_diagnostic(
queued,
in_flight,
max_parallelism,
"no_available_slots",
)
.await;
return Ok((false, iteration));
}
let effective_reason =
if slot_recovered && matches!(reanalysis_reason, ReanalysisReason::QueueNotification) {
ReanalysisReason::SlotRecovery
} else {
reanalysis_reason
};
let bypass_debounce = matches!(
effective_reason,
ReanalysisReason::SlotRecovery
| ReanalysisReason::ResolveCompletion
| ReanalysisReason::RepairCandidate
);
let should_analyze = if iteration == 1 {
info!("First iteration, skipping debounce check");
true
} else {
self.should_reanalyze(bypass_debounce).await
};
if !should_analyze {
info!("Debounce active, waiting for timer or queue notification");
self.emit_no_analysis_diagnostic(queued, in_flight, max_parallelism, "debounce_active")
.await;
return Ok((false, iteration));
}
let (executable_changes, skipped_changes) = self.filter_executable_changes(queued);
for (change_id, reason) in skipped_changes {
send_event(
&self.event_tx,
ParallelEvent::ChangeSkipped { change_id, reason },
)
.await;
}
*queued = executable_changes;
if queued.is_empty() {
info!("All queued changes skipped due to failed dependencies");
self.emit_no_analysis_diagnostic(
queued,
in_flight,
max_parallelism,
"local_queue_empty_after_reconciliation",
)
.await;
if in_flight.is_empty() {
return Ok((true, iteration)); } else {
return Ok((false, iteration)); }
}
info!(
"Re-analysis triggered: iteration={}, queued={}, in_flight={}, trigger={}",
iteration,
queued.len(),
in_flight.len(),
effective_reason
);
send_event(
&self.event_tx,
ParallelEvent::AnalysisStarted {
remaining_changes: queued.len(),
},
)
.await;
let in_flight_ids: Vec<String> = in_flight.iter().cloned().collect();
let analysis_result = analyzer(queued, &in_flight_ids, iteration).await;
if analysis_result.order.is_empty() {
warn!("No order returned from analysis");
if in_flight.is_empty() {
return Ok((true, iteration)); } else {
return Ok((false, iteration)); }
}
self.failed_tracker
.set_dependencies(analysis_result.dependencies.clone());
self.change_dependencies = analysis_result.dependencies.clone();
let available_slots = self.calculate_available_slots(max_parallelism, in_flight);
info!(
"Available slots after analysis: {} (max: {}, in_flight: {}, queued: {})",
available_slots,
max_parallelism,
in_flight.len(),
queued.len()
);
let selected_changes = self
.select_changes_for_dispatch(&analysis_result, available_slots, in_flight)
.await;
let new_iteration = if !selected_changes.is_empty() {
let base_revision = self
.workspace_manager
.get_current_revision()
.await
.map_err(OrchestratorError::from)?;
info!(
"Dispatching {} changes (iteration {}): {:?}",
selected_changes.len(),
iteration,
selected_changes
);
for change_id in &selected_changes {
if let Err(e) = self
.dispatch_change_to_workspace(
change_id.clone(),
base_revision.clone(),
semaphore.clone(),
join_set,
in_flight,
cleanup_guard,
)
.await
{
let message = format!("Failed to dispatch change '{}': {}", change_id, e);
self.failed_tracker.mark_failed(change_id);
send_event(
&self.event_tx,
ParallelEvent::ProcessingError {
id: change_id.clone(),
error: message.clone(),
},
)
.await;
send_event(
&self.event_tx,
ParallelEvent::Log(LogEntry::error(message.clone())),
)
.await;
error!("{}", message);
}
}
let dispatched_set: std::collections::HashSet<_> = selected_changes.iter().collect();
queued.retain(|c| !dispatched_set.contains(&c.id));
iteration + 1
} else {
iteration
};
Ok((false, new_iteration))
}
}