use super::fsm::{FlowStopMode, PipelineEvent, PipelineState};
use crate::errors::FlowError;
use crate::stages::common::stage_handle::STOP_REASON_TIMEOUT;
use crate::supervised_base::{HandleError, StandardHandle, SupervisorHandle};
use obzenflow_core::event::SystemEvent;
use obzenflow_core::journal::Journal;
use obzenflow_core::StageId;
use obzenflow_topology::Topology;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use std::time::Duration;
type MiddlewareStacks = Arc<HashMap<StageId, MiddlewareStackConfig>>;
type ContractAttachments = Arc<HashMap<(StageId, StageId), Vec<String>>>;
type JoinMetadataMap = Arc<HashMap<StageId, crate::pipeline::JoinMetadata>>;
pub(crate) struct FlowHandleExtras {
pub topology: Option<Arc<Topology>>,
pub flow_name: String,
pub middleware_stacks: Option<MiddlewareStacks>,
pub contract_attachments: Option<ContractAttachments>,
pub join_metadata: Option<JoinMetadataMap>,
pub system_journal: Option<Arc<dyn Journal<SystemEvent>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MiddlewareStackConfig {
pub stack: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub circuit_breaker: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rate_limiter: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub backpressure: Option<serde_json::Value>,
}
impl MiddlewareStackConfig {
pub fn names_only(stack: Vec<String>) -> Self {
Self {
stack,
circuit_breaker: None,
rate_limiter: None,
retry: None,
backpressure: None,
}
}
}
pub struct FlowHandle {
handle: StandardHandle<PipelineEvent, PipelineState>,
metrics_exporter: Option<Arc<dyn obzenflow_core::metrics::MetricsExporter>>,
topology: Option<Arc<Topology>>,
flow_name: String,
middleware_stacks: Option<MiddlewareStacks>,
contract_attachments: Option<ContractAttachments>,
join_metadata: Option<JoinMetadataMap>,
system_journal: Option<Arc<dyn Journal<SystemEvent>>>,
}
impl FlowHandle {
pub(crate) fn new(
handle: StandardHandle<PipelineEvent, PipelineState>,
metrics_exporter: Option<Arc<dyn obzenflow_core::metrics::MetricsExporter>>,
extras: FlowHandleExtras,
) -> Self {
let FlowHandleExtras {
topology,
flow_name,
middleware_stacks,
contract_attachments,
join_metadata,
system_journal,
} = extras;
Self {
handle,
metrics_exporter,
topology,
flow_name,
middleware_stacks,
contract_attachments,
system_journal,
join_metadata,
}
}
pub async fn start(&self) -> Result<(), FlowError> {
let current_state = self.current_state();
tracing::debug!(
"FlowHandle::start() - Current pipeline state: {:?}",
current_state
);
tracing::info!("FlowHandle::start() - Sending PipelineEvent::Run to start flow");
self.send_event(PipelineEvent::Run).await
}
pub async fn run(self) -> Result<(), FlowError> {
let current_state = self.current_state();
tracing::debug!(
"FlowHandle::run() - Current pipeline state: {:?}",
current_state
);
tracing::info!("FlowHandle::run() - Sending PipelineEvent::Run to start flow");
self.send_event(PipelineEvent::Run).await?;
tracing::info!("FlowHandle::run() - Run event sent, waiting for completion");
let state_rx = self.state_receiver();
let result = self.wait_for_completion().await;
tracing::info!(
"FlowHandle::run() - wait_for_completion returned: {:?}",
result
);
if let Err(e) = result {
tracing::error!("FlowHandle::run() failed: {}", e);
return Err(e);
}
let final_state = state_rx.borrow().clone();
match final_state {
PipelineState::Failed { reason, .. } => Err(FlowError::ExecutionFailed(Box::new(
io::Error::other(reason),
))),
PipelineState::AbortRequested { reason, .. } => Err(FlowError::ExecutionFailed(
Box::new(io::Error::other(format!("{reason:?}"))),
)),
_ => Ok(()),
}
}
pub async fn run_with_metrics(
self,
) -> Result<Option<Arc<dyn obzenflow_core::metrics::MetricsExporter>>, FlowError> {
self.send_event(PipelineEvent::Run).await?;
let metrics = self.metrics_exporter.clone();
let system_journal = self.system_journal.clone();
self.wait_for_completion().await?;
if let Some(journal) = system_journal {
use obzenflow_core::event::system_event::MetricsCoordinationEvent;
use obzenflow_core::event::SystemEventType;
use std::time::Duration;
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while std::time::Instant::now() < deadline {
match journal.read_last_n(256).await {
Ok(events) => {
let drained = events.iter().any(|envelope| {
matches!(
envelope.event.event,
SystemEventType::MetricsCoordination(
MetricsCoordinationEvent::Drained
| MetricsCoordinationEvent::Shutdown
)
)
});
if drained {
break;
}
}
Err(e) => {
tracing::warn!(
journal_error = %e,
"Failed to read system journal while waiting for metrics drain"
);
break;
}
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
Ok(metrics)
}
pub async fn stop(&self) -> Result<(), FlowError> {
self.stop_cancel().await
}
pub async fn stop_cancel(&self) -> Result<(), FlowError> {
if !self.is_running() {
return Ok(());
}
self.send_event(PipelineEvent::StopRequested {
mode: FlowStopMode::Cancel,
reason: None,
})
.await
}
#[doc(hidden)]
pub async fn stop_cancel_timeout(&self) -> Result<(), FlowError> {
if !self.is_running() {
return Ok(());
}
self.send_event(PipelineEvent::StopRequested {
mode: FlowStopMode::Cancel,
reason: Some(STOP_REASON_TIMEOUT.to_string()),
})
.await
}
pub async fn stop_graceful(&self, timeout: Duration) -> Result<(), FlowError> {
if !self.is_running() {
return Ok(());
}
self.send_event(PipelineEvent::StopRequested {
mode: FlowStopMode::Graceful { timeout },
reason: None,
})
.await
}
pub async fn shutdown(&self) -> Result<(), FlowError> {
self.stop().await
}
pub async fn abort(&self, reason: &str) -> Result<(), FlowError> {
self.send_event(PipelineEvent::Error {
message: format!("Force abort: {reason}"),
})
.await
}
pub fn is_running(&self) -> bool {
self.handle.is_running()
}
pub fn state_receiver(&self) -> tokio::sync::watch::Receiver<PipelineState> {
self.handle.state_receiver()
}
pub fn metrics_exporter(&self) -> Option<Arc<dyn obzenflow_core::metrics::MetricsExporter>> {
self.metrics_exporter.clone()
}
pub fn topology(&self) -> Option<Arc<Topology>> {
self.topology.clone()
}
pub fn middleware_stacks(&self) -> Option<MiddlewareStacks> {
self.middleware_stacks.clone()
}
pub fn contract_attachments(&self) -> Option<ContractAttachments> {
self.contract_attachments.clone()
}
pub fn join_metadata(&self) -> Option<JoinMetadataMap> {
self.join_metadata.clone()
}
pub fn system_journal(&self) -> Option<Arc<dyn Journal<SystemEvent>>> {
self.system_journal.clone()
}
pub fn flow_name(&self) -> &str {
&self.flow_name
}
pub async fn render_metrics(&self) -> Result<String, FlowError> {
if let Some(ref exporter) = self.metrics_exporter {
exporter.render_metrics().map_err(|e| {
FlowError::ExecutionFailed(Box::new(std::io::Error::other(e.to_string())))
})
} else {
Err(FlowError::ExecutionFailed(Box::new(std::io::Error::other(
"No metrics exporter configured",
))))
}
}
}
#[async_trait::async_trait]
impl SupervisorHandle for FlowHandle {
type Event = PipelineEvent;
type State = PipelineState;
type Error = FlowError;
async fn send_event(&self, event: Self::Event) -> Result<(), Self::Error> {
self.handle.send_event(event).await.map_err(|e| match e {
HandleError::SupervisorNotRunning => {
FlowError::ExecutionFailed(Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Pipeline supervisor is not running",
)))
}
HandleError::SupervisorFailed(msg) => {
FlowError::ExecutionFailed(Box::new(std::io::Error::other(msg)))
}
HandleError::SupervisorPanicked(msg) => FlowError::ExecutionFailed(Box::new(
std::io::Error::other(format!("Task panicked: {msg}")),
)),
_ => FlowError::ExecutionFailed(Box::new(std::io::Error::other(e.to_string()))),
})
}
fn current_state(&self) -> Self::State {
self.handle.current_state()
}
async fn wait_for_completion(self) -> Result<(), Self::Error> {
self.handle
.wait_for_completion()
.await
.map_err(|e| match e {
HandleError::SupervisorNotRunning => {
FlowError::ExecutionFailed(Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Pipeline supervisor is not running",
)))
}
HandleError::SupervisorFailed(msg) => {
FlowError::ExecutionFailed(Box::new(std::io::Error::other(msg)))
}
HandleError::SupervisorPanicked(msg) => FlowError::ExecutionFailed(Box::new(
std::io::Error::other(format!("Task panicked: {msg}")),
)),
_ => FlowError::ExecutionFailed(Box::new(std::io::Error::other(e.to_string()))),
})
}
}