use super::{startup_mode_manual, BoxError, PipelineContext, PipelineEvent, PipelineSupervisor};
use crate::id_conversions::StageIdExt;
use crate::messaging::SubscriptionPoller;
use crate::supervised_base::EventLoopDirective;
use obzenflow_core::StageId;
use std::time::Duration;
pub(super) async fn dispatch_materialized(
supervisor: &mut PipelineSupervisor,
context: &mut PipelineContext,
) -> Result<EventLoopDirective<PipelineEvent>, BoxError> {
let supervisors = &context.stage_supervisors;
for (stage_id, stage) in supervisors.iter() {
if stage.is_ready()
&& !context
.topology
.upstream_stages(stage_id.to_topology_id())
.is_empty()
{
if context.running_stages.insert(*stage_id) {
tracing::info!("Stage '{}' was already running", stage.stage_name());
}
}
}
let subscription = context
.completion_subscription
.as_mut()
.ok_or("No subscription available - should have been initialized during materialization")?;
use crate::messaging::PollResult;
match subscription.poll_next().await {
PollResult::Event(envelope) => {
let event = &envelope.event;
context.last_system_event_id_seen = Some(event.id);
if let obzenflow_core::event::SystemEventType::StageLifecycle {
stage_id,
event: obzenflow_core::event::StageLifecycleEvent::Running,
} = &event.event
{
context.running_stages.insert(*stage_id);
let stage_info = context
.topology
.stages()
.find(|s| s.id == stage_id.to_topology_id());
let stage_name = stage_info
.map(|s| s.name.clone())
.unwrap_or_else(|| "unknown".to_string());
if context
.topology
.upstream_stages(stage_id.to_topology_id())
.is_empty()
{
tracing::debug!(
"Source stage '{}' is running (waiting for pipeline signal)",
stage_name
);
} else {
tracing::info!("Stage '{}' is now running", stage_name);
}
}
}
PollResult::NoEvents => {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
PollResult::Error(e) => {
tracing::error!("Error polling system journal in Awaiting: {}", e);
return Ok(EventLoopDirective::Transition(PipelineEvent::Error {
message: format!("System journal error: {e}"),
}));
}
}
let running_stages = &context.running_stages;
let topology = &context.topology;
let non_source_stages: std::collections::HashSet<_> = topology
.stages()
.filter(|stage_info| !topology.upstream_stages(stage_info.id).is_empty())
.map(|stage_info| StageId::from_topology_id(stage_info.id))
.collect();
let all_ready = !non_source_stages.is_empty()
&& non_source_stages
.iter()
.all(|stage_id| running_stages.contains(stage_id));
if all_ready {
if startup_mode_manual() {
if supervisor.should_log_manual_wait() {
tracing::info!(
"All {} non-source stages are running (startup_mode=manual); waiting for external Run",
non_source_stages.len()
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(EventLoopDirective::Continue)
} else {
tracing::info!(
"All {} non-source stages are running, starting pipeline",
non_source_stages.len()
);
Ok(EventLoopDirective::Transition(PipelineEvent::Run))
}
} else {
let waiting_for = non_source_stages.difference(running_stages).count();
if waiting_for > 0 {
tracing::debug!(
"Waiting for {} more stage(s) to report running ({}/{} ready)",
waiting_for,
running_stages.len(),
non_source_stages.len()
);
}
Ok(EventLoopDirective::Continue)
}
}