use chrono::Utc;
use rustvello_core::error::{RustvelloError, RustvelloResult};
use rustvello_core::orchestrator::ActiveRunnerInfo;
use rustvello_proto::call::{CallDTO, SerializedArguments};
use rustvello_proto::identifiers::{CallId, InvocationId, RunnerId};
use rustvello_proto::invocation::{InvocationDTO, InvocationHistory};
use rustvello_proto::status::{ConcurrencyControlType, InvocationStatus};
use super::OrchestratorCoordinator;
#[derive(Debug)]
#[non_exhaustive]
pub enum RouteCallResult {
New(InvocationId),
Reused(InvocationId),
ReusedDifferentCall {
invocation_id: InvocationId,
existing_call_id: CallId,
},
}
impl OrchestratorCoordinator {
pub async fn route_call(
&self,
new_invocation_id: &InvocationId,
call_dto: &CallDTO,
cc_args: Option<&SerializedArguments>,
registration_cc: ConcurrencyControlType,
index_cc: bool,
runner_id: &RunnerId,
) -> RustvelloResult<RouteCallResult> {
if registration_cc == ConcurrencyControlType::Unlimited {
return self
.create_and_route_invocation(
new_invocation_id,
call_dto,
cc_args,
index_cc,
runner_id,
)
.await
.map(RouteCallResult::New);
}
let existing = self
.orchestrator
.get_existing_invocations(&call_dto.task_id, cc_args, &[InvocationStatus::Registered])
.await?;
if let Some(existing_inv_id) = existing.into_iter().next() {
let existing_inv = self.state_backend.get_invocation(&existing_inv_id).await?;
if existing_inv.call_id == call_dto.call_id {
return Ok(RouteCallResult::Reused(existing_inv_id));
}
return Ok(RouteCallResult::ReusedDifferentCall {
invocation_id: existing_inv_id,
existing_call_id: existing_inv.call_id,
});
}
self.create_and_route_invocation(new_invocation_id, call_dto, cc_args, index_cc, runner_id)
.await
.map(RouteCallResult::New)
}
async fn create_and_route_invocation(
&self,
invocation_id: &InvocationId,
call_dto: &CallDTO,
cc_args: Option<&SerializedArguments>,
index_cc: bool,
runner_id: &RunnerId,
) -> RustvelloResult<InvocationId> {
let inv_dto = InvocationDTO::new(
invocation_id.clone(),
call_dto.task_id.clone(),
call_dto.call_id.clone(),
);
self.state_backend
.upsert_invocation(&inv_dto, call_dto)
.await?;
let record = self
.orchestrator
.register_invocation_with_id(invocation_id, call_dto, Some(runner_id))
.await?;
let history = InvocationHistory::new(invocation_id.clone(), record.clone(), None)
.with_runner(runner_id.clone());
self.state_backend.add_history(&history).await?;
if let Some(ref tm) = self.trigger_manager {
let ctx = rustvello_proto::trigger::StatusContext {
invocation_id: invocation_id.clone(),
task_id: call_dto.task_id.clone(),
status: record.status,
arguments: call_dto.serialized_arguments.0.clone(),
};
tm.report_status_change(&ctx).await?;
}
if index_cc {
self.orchestrator
.index_for_concurrency_control(invocation_id, &call_dto.task_id, cc_args)
.await?;
}
self.broker.route_invocation(invocation_id).await?;
Ok(invocation_id.clone())
}
pub async fn reroute_invocations(
&self,
invocation_ids: &[InvocationId],
runner_id: &RunnerId,
) -> RustvelloResult<()> {
for inv_id in invocation_ids {
match self
.orchestrator
.set_invocation_status(inv_id, InvocationStatus::Rerouted, Some(runner_id))
.await
{
Ok(record) => {
let history = InvocationHistory::new(inv_id.clone(), record.clone(), None)
.with_runner(runner_id.clone());
let _ = self.state_backend.add_history(&history).await;
if let Some(ref tm) = self.trigger_manager {
let (task_id, arguments) = self.get_trigger_context(inv_id).await;
let ctx = rustvello_proto::trigger::StatusContext {
invocation_id: inv_id.clone(),
task_id,
status: InvocationStatus::Rerouted,
arguments,
};
let _ = tm.report_status_change(&ctx).await;
}
self.broker.route_invocation(inv_id).await?
}
Err(RustvelloError::InvalidStatusTransition { .. }) => {
}
Err(e) => return Err(e),
}
}
Ok(())
}
pub async fn trigger_loop_iteration(
&self,
runner_id: &RunnerId,
) -> RustvelloResult<Vec<InvocationId>> {
let tm = match self.trigger_manager {
Some(ref tm) => tm,
None => return Ok(Vec::new()),
};
let _ = tm.evaluate_cron_conditions().await?;
let to_invoke = tm.evaluate_triggers().await?;
let mut created_ids = Vec::new();
for (trigger_def, args_value) in &to_invoke {
let args = json_value_to_serialized_args(args_value);
let call_dto = CallDTO::new(trigger_def.task_id.clone(), args);
let inv_id = InvocationId::new();
let inv_dto = InvocationDTO::new(
inv_id.clone(),
trigger_def.task_id.clone(),
call_dto.call_id.clone(),
);
self.state_backend
.upsert_invocation(&inv_dto, &call_dto)
.await?;
let record = self
.orchestrator
.register_invocation_with_id(&inv_id, &call_dto, Some(runner_id))
.await?;
let history = InvocationHistory::new(inv_id.clone(), record.clone(), None)
.with_runner(runner_id.clone());
if let Err(e) = self.state_backend.add_history(&history).await {
tracing::warn!("trigger_loop_iteration: failed to record history: {e}");
}
self.broker.route_invocation(&inv_id).await?;
created_ids.push(inv_id);
}
Ok(created_ids)
}
pub async fn check_atomic_services(
&self,
runner_id: &RunnerId,
service_interval_minutes: f64,
spread_margin_minutes: f64,
runner_timeout_seconds: f64,
) -> RustvelloResult<Option<Vec<InvocationId>>> {
self.orchestrator
.register_heartbeat(runner_id, true)
.await?;
let active_runners = self
.orchestrator
.get_active_runners(runner_timeout_seconds as u64, Some(true))
.await?;
let now = Utc::now().timestamp() as f64
+ Utc::now().timestamp_subsec_nanos() as f64 / 1_000_000_000.0;
if !can_run_atomic_service(
runner_id,
&active_runners,
now,
service_interval_minutes,
spread_margin_minutes,
) {
return Ok(None);
}
let start = Utc::now();
let created_ids = self.trigger_loop_iteration(runner_id).await?;
let end = Utc::now();
self.orchestrator
.record_atomic_service_execution(runner_id, start, end)
.await?;
Ok(Some(created_ids))
}
}
fn can_run_atomic_service(
runner_id: &RunnerId,
active_runners: &[ActiveRunnerInfo],
current_time: f64,
service_interval_minutes: f64,
spread_margin_minutes: f64,
) -> bool {
if active_runners.is_empty() {
return false;
}
let total_runners = active_runners.len();
if total_runners == 1 {
return true;
}
let runner_position = active_runners
.iter()
.position(|r| r.runner_id == *runner_id);
let runner_position = match runner_position {
Some(pos) => pos,
None => return false,
};
let service_interval = service_interval_minutes * 60.0;
let spread_margin = spread_margin_minutes * 60.0;
let time_slot_size = service_interval / total_runners as f64;
let start_time = runner_position as f64 * time_slot_size;
let mut end_time = start_time + time_slot_size - spread_margin;
if end_time <= start_time {
end_time = start_time + (time_slot_size / 2.0);
}
let time_in_cycle = current_time % service_interval;
start_time <= time_in_cycle && time_in_cycle < end_time
}
fn json_value_to_serialized_args(value: &serde_json::Value) -> SerializedArguments {
let mut args = SerializedArguments::new();
if let serde_json::Value::Object(map) = value {
for (k, v) in map {
let v_str = match v {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
args.insert(k.clone(), v_str);
}
}
args
}