mod extended;
mod retrieval;
pub use extended::RouteCallResult;
use std::collections::BTreeMap;
use std::sync::Arc;
use rustvello_core::broker::Broker;
use rustvello_core::client_data_store::ClientDataStoreManager;
use rustvello_core::error::{RustvelloResult, TaskError};
use rustvello_core::orchestrator::Orchestrator;
use rustvello_core::state_backend::StateBackend;
use rustvello_core::trigger::TriggerManager;
use rustvello_proto::call::CallDTO;
use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
use rustvello_proto::invocation::{InvocationDTO, InvocationHistory};
use rustvello_proto::status::{InvocationStatus, InvocationStatusRecord};
pub struct OrchestratorCoordinator {
pub(crate) orchestrator: Arc<dyn Orchestrator>,
pub(crate) state_backend: Arc<dyn StateBackend>,
pub(crate) broker: Arc<dyn Broker>,
pub(crate) client_data_store: Arc<ClientDataStoreManager>,
pub(crate) trigger_manager: Option<TriggerManager>,
auto_purge_delay_secs: u64,
}
fn hours_to_purge_secs(hours: f64) -> u64 {
if !hours.is_finite() || hours < 0.0 {
tracing::warn!(
auto_purge_hours = hours,
"auto_final_invocation_purge_hours is not a positive finite number; \
auto-purge disabled (effective value: 0 hours)"
);
return 0;
}
let secs = hours * 3600.0;
if secs >= u64::MAX as f64 {
tracing::warn!(
auto_purge_hours = hours,
"auto_final_invocation_purge_hours is too large; clamping to u64::MAX seconds"
);
return u64::MAX;
}
secs as u64
}
impl OrchestratorCoordinator {
pub fn new(
orchestrator: Arc<dyn Orchestrator>,
state_backend: Arc<dyn StateBackend>,
broker: Arc<dyn Broker>,
client_data_store: Arc<ClientDataStoreManager>,
trigger_manager: Option<TriggerManager>,
auto_purge_hours: f64,
) -> Self {
Self {
orchestrator,
state_backend,
broker,
client_data_store,
trigger_manager,
auto_purge_delay_secs: hours_to_purge_secs(auto_purge_hours),
}
}
pub async fn set_invocation_status(
&self,
invocation_id: &InvocationId,
status: InvocationStatus,
runner_id: &RunnerId,
) -> RustvelloResult<InvocationStatusRecord> {
let (task_id, arguments) = if self.trigger_manager.is_some() {
self.get_trigger_context(invocation_id).await
} else {
(TaskId::new("_", "_"), BTreeMap::new())
};
self.set_invocation_status_with_context(
invocation_id,
status,
runner_id,
&task_id,
arguments,
)
.await
}
pub async fn set_invocation_status_with_context(
&self,
invocation_id: &InvocationId,
status: InvocationStatus,
runner_id: &RunnerId,
task_id: &TaskId,
arguments: BTreeMap<String, String>,
) -> RustvelloResult<InvocationStatusRecord> {
let record = self
.orchestrator
.set_invocation_status(invocation_id, status, Some(runner_id))
.await?;
if status.is_terminal() {
self.orchestrator.release_waiters(invocation_id).await?;
if self.auto_purge_delay_secs > 0 {
self.orchestrator.schedule_auto_purge(invocation_id).await?;
}
}
let history = InvocationHistory::new(invocation_id.clone(), record.clone(), None)
.with_runner(runner_id.clone());
self.state_backend.add_history(&history).await?;
if let Some(ref tm) = self.trigger_manager {
let ctx = rustvello_proto::trigger::StatusContext {
invocation_id: invocation_id.clone(),
task_id: task_id.clone(),
status,
arguments,
};
tm.report_status_change(&ctx).await?;
}
Ok(record)
}
pub async fn register_invocations(
&self,
invocations: &[(InvocationDTO, CallDTO)],
runner_id: &RunnerId,
) -> RustvelloResult<()> {
let mut inv_ids = Vec::with_capacity(invocations.len());
for (inv_dto, call_dto) in invocations {
self.state_backend
.upsert_invocation(inv_dto, call_dto)
.await?;
let record = self
.orchestrator
.register_invocation_with_id(&inv_dto.invocation_id, call_dto, Some(runner_id))
.await?;
let history =
InvocationHistory::new(inv_dto.invocation_id.clone(), record.clone(), None)
.with_runner(runner_id.clone());
self.state_backend.add_history(&history).await?;
if let Some(ref tm) = self.trigger_manager {
let ctx = rustvello_proto::trigger::StatusContext {
invocation_id: inv_dto.invocation_id.clone(),
task_id: inv_dto.task_id.clone(),
status: record.status,
arguments: call_dto.serialized_arguments.0.clone(),
};
tm.report_status_change(&ctx).await?;
}
inv_ids.push(inv_dto.invocation_id.clone());
}
self.broker.route_invocations(&inv_ids).await?;
Ok(())
}
pub async fn set_invocation_result(
&self,
invocation_id: &InvocationId,
result: &str,
runner_id: &RunnerId,
) -> RustvelloResult<()> {
let (task_id, arguments) = if self.trigger_manager.is_some() {
self.get_trigger_context(invocation_id).await
} else {
(TaskId::new("_", "_"), BTreeMap::new())
};
self.set_invocation_result_with_context(
invocation_id,
result,
runner_id,
&task_id,
arguments,
)
.await
}
pub async fn set_invocation_result_with_context(
&self,
invocation_id: &InvocationId,
result: &str,
runner_id: &RunnerId,
task_id: &TaskId,
arguments: BTreeMap<String, String>,
) -> RustvelloResult<()> {
self.state_backend
.store_result(invocation_id, result)
.await?;
self.set_invocation_status_with_context(
invocation_id,
InvocationStatus::Success,
runner_id,
task_id,
arguments.clone(),
)
.await?;
if let Some(ref tm) = self.trigger_manager {
let result_value: serde_json::Value =
serde_json::from_str(result).unwrap_or_else(|e| {
tracing::warn!(
invocation_id = %invocation_id,
"Failed to parse result as JSON for trigger: {e}; wrapping as string"
);
serde_json::Value::String(result.to_owned())
});
let ctx = rustvello_proto::trigger::ResultContext {
invocation_id: invocation_id.clone(),
task_id: task_id.clone(),
result: result_value,
arguments,
};
tm.report_result(&ctx).await?;
}
Ok(())
}
pub async fn set_invocation_exception(
&self,
invocation_id: &InvocationId,
error_type: &str,
error_message: &str,
runner_id: &RunnerId,
) -> RustvelloResult<()> {
let (task_id, arguments) = if self.trigger_manager.is_some() {
self.get_trigger_context(invocation_id).await
} else {
(TaskId::new("_", "_"), BTreeMap::new())
};
self.set_invocation_exception_with_context(
invocation_id,
error_type,
error_message,
runner_id,
&task_id,
arguments,
)
.await
}
pub async fn set_invocation_exception_with_context(
&self,
invocation_id: &InvocationId,
error_type: &str,
error_message: &str,
runner_id: &RunnerId,
task_id: &TaskId,
arguments: BTreeMap<String, String>,
) -> RustvelloResult<()> {
let task_error = TaskError {
error_type: error_type.to_owned(),
message: error_message.to_owned(),
traceback: None,
};
self.state_backend
.store_error(invocation_id, &task_error)
.await?;
self.set_invocation_status_with_context(
invocation_id,
InvocationStatus::Failed,
runner_id,
task_id,
arguments.clone(),
)
.await?;
if let Some(ref tm) = self.trigger_manager {
let ctx = rustvello_proto::trigger::ExceptionContext {
invocation_id: invocation_id.clone(),
task_id: task_id.clone(),
error_type: error_type.to_owned(),
error_message: error_message.to_owned(),
arguments,
};
tm.report_failure(&ctx).await?;
}
Ok(())
}
pub async fn set_invocation_retry(
&self,
invocation_id: &InvocationId,
runner_id: &RunnerId,
) -> RustvelloResult<()> {
let (task_id, arguments) = if self.trigger_manager.is_some() {
self.get_trigger_context(invocation_id).await
} else {
(TaskId::new("_", "_"), BTreeMap::new())
};
self.set_invocation_retry_with_context(invocation_id, runner_id, &task_id, arguments)
.await
}
pub async fn set_invocation_retry_with_context(
&self,
invocation_id: &InvocationId,
runner_id: &RunnerId,
task_id: &TaskId,
arguments: BTreeMap<String, String>,
) -> RustvelloResult<()> {
self.set_invocation_status_with_context(
invocation_id,
InvocationStatus::Retry,
runner_id,
task_id,
arguments,
)
.await?;
self.orchestrator
.increment_invocation_retries(invocation_id)
.await?;
self.broker.route_invocation(invocation_id).await?;
Ok(())
}
pub(crate) async fn get_invocation_arguments(
&self,
invocation_id: &InvocationId,
) -> BTreeMap<String, String> {
let inv_dto = match self.state_backend.get_invocation(invocation_id).await {
Ok(dto) => dto,
Err(_) => return BTreeMap::new(),
};
match self.state_backend.get_call(&inv_dto.call_id).await {
Ok(call) => call.serialized_arguments.0,
Err(_) => BTreeMap::new(),
}
}
pub async fn get_trigger_context(
&self,
invocation_id: &InvocationId,
) -> (TaskId, BTreeMap<String, String>) {
let inv_dto = match self.state_backend.get_invocation(invocation_id).await {
Ok(dto) => dto,
Err(_) => return (TaskId::new("unknown", "unknown"), BTreeMap::new()),
};
let args = match self.state_backend.get_call(&inv_dto.call_id).await {
Ok(call) => call.serialized_arguments.0,
Err(_) => BTreeMap::new(),
};
(inv_dto.task_id, args)
}
}