use crate::config::FunctionConfig;
use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
use crate::provider::{FunctionProvider, HealthReport};
use camel_api::Exchange;
use camel_api::function::*;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
pub(crate) struct DefaultFunctionInvoker {
pub(crate) pool: Arc<RunnerPool>,
pub(crate) provider: Arc<dyn FunctionProvider>,
pub(crate) config: FunctionConfig,
pub(crate) pending: Mutex<Vec<(FunctionDefinition, Option<String>)>>,
pub(crate) staging: Mutex<HashMap<u64, StagedEntries>>,
pub(crate) function_timeouts: Mutex<HashMap<FunctionId, u64>>,
pub(crate) next_generation: AtomicU64,
pub(crate) current_generation: AtomicU64,
pub(crate) started: AtomicBool,
pub(crate) register_lock: tokio::sync::Mutex<()>,
}
type StagedEntries = Vec<(FunctionDefinition, Option<String>)>;
impl DefaultFunctionInvoker {
pub(crate) fn new(
pool: Arc<RunnerPool>,
provider: Arc<dyn FunctionProvider>,
config: FunctionConfig,
) -> Self {
Self {
pool,
provider,
config,
pending: Mutex::new(Vec::new()),
staging: Mutex::new(HashMap::new()),
function_timeouts: Mutex::new(HashMap::new()),
next_generation: AtomicU64::new(0),
current_generation: AtomicU64::new(0),
started: AtomicBool::new(false),
register_lock: tokio::sync::Mutex::new(()),
}
}
pub(crate) async fn wait_until_healthy(
&self,
handle: &crate::pool::RunnerHandle,
) -> Result<(), FunctionInvocationError> {
let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
loop {
if tokio::time::Instant::now() > deadline {
return Err(FunctionInvocationError::RunnerUnavailable {
reason: "boot timeout".into(),
});
}
match self.provider.health(handle).await {
Ok(HealthReport::Healthy) => {
*handle.state.lock().expect("state") = RunnerState::Healthy;
return Ok(());
}
Ok(HealthReport::Unhealthy(reason)) => {
*handle.state.lock().expect("state") = RunnerState::Unhealthy {
since: std::time::Instant::now(),
reason,
};
tokio::time::sleep(self.config.health_interval).await;
}
Err(e) => {
return Err(FunctionInvocationError::RunnerUnavailable {
reason: e.to_string(),
});
}
}
}
}
}
impl FunctionInvokerSync for DefaultFunctionInvoker {
fn stage_pending(&self, def: FunctionDefinition, route_id: Option<&str>, generation: u64) {
let rid = route_id.map(ToOwned::to_owned);
if !self.started.load(Ordering::SeqCst) {
self.pending.lock().expect("pending").push((def, rid));
return;
}
let mut staging = self.staging.lock().expect("staging");
let entry = staging.entry(generation).or_default();
let did = def.id.clone();
let rid_owned = rid.clone();
entry
.retain(|(existing, existing_rid)| !(existing.id == did && existing_rid == &rid_owned));
entry.push((def, rid_owned));
}
fn discard_staging(&self, generation: u64) {
self.staging.lock().expect("staging").remove(&generation);
}
fn begin_reload(&self) -> u64 {
let generation = self.next_generation.fetch_add(1, Ordering::SeqCst) + 1;
self.current_generation.store(generation, Ordering::SeqCst);
self.staging
.lock()
.expect("staging")
.entry(generation)
.or_default();
generation
}
fn function_refs_for_route(&self, route_id: &str) -> Vec<(FunctionId, Option<String>)> {
self.pool
.function_to_key
.iter()
.filter(|kv| kv.key().1.as_deref() == Some(route_id))
.map(|kv| kv.key().clone())
.collect()
}
fn staged_refs_for_route(
&self,
route_id: &str,
generation: u64,
) -> Vec<(FunctionId, Option<String>)> {
let staging = self.staging.lock().expect("staging");
staging
.get(&generation)
.map(|entries| {
entries
.iter()
.filter(|(_, rid)| rid.as_deref() == Some(route_id))
.map(|(def, rid)| (def.id.clone(), rid.clone()))
.collect()
})
.unwrap_or_default()
}
fn staged_defs_for_route(
&self,
route_id: &str,
generation: u64,
) -> Vec<(FunctionDefinition, Option<String>)> {
let staging = self.staging.lock().expect("staging");
staging
.get(&generation)
.map(|entries| {
entries
.iter()
.filter(|(_, rid)| rid.as_deref() == Some(route_id))
.map(|(def, rid)| (def.clone(), rid.clone()))
.collect()
})
.unwrap_or_default()
}
}
#[async_trait::async_trait]
impl FunctionInvoker for DefaultFunctionInvoker {
async fn register(
&self,
def: FunctionDefinition,
route_id: Option<&str>,
) -> Result<(), FunctionInvocationError> {
let key = RunnerPoolKey {
runtime: def.runtime.clone(),
};
let _guard = self.register_lock.lock().await;
let handle = {
let existing = self.pool.handles.get(&key).map(|h| h.clone());
match existing {
Some(h) => h,
None => {
let spawned = self.provider.spawn(&key).await.map_err(|e| {
FunctionInvocationError::RunnerUnavailable {
reason: e.to_string(),
}
})?;
self.wait_until_healthy(&spawned).await?;
self.pool
.handles
.entry(key.clone())
.or_insert(spawned)
.clone()
}
}
};
self.provider
.register(&handle, &def)
.await
.map_err(|e| FunctionInvocationError::Transport(e.to_string()))?;
let ref_key = (def.id.clone(), route_id.map(ToOwned::to_owned));
self.pool
.ref_counts
.entry(ref_key.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
self.function_timeouts
.lock()
.expect("function_timeouts")
.insert(def.id.clone(), def.timeout_ms);
self.pool.function_to_key.insert(ref_key, key);
Ok(())
}
async fn unregister(
&self,
id: &FunctionId,
route_id: Option<&str>,
) -> Result<(), FunctionInvocationError> {
let key = (id.clone(), route_id.map(ToOwned::to_owned));
let mut should_unregister = false;
if let Some(mut c) = self.pool.ref_counts.get_mut(&key) {
if *c > 1 {
*c -= 1;
} else {
should_unregister = true;
}
}
if should_unregister {
self.pool.ref_counts.remove(&key);
}
if should_unregister && let Some((_, pool_key)) = self.pool.function_to_key.remove(&key) {
let still_used_by_other_route =
self.pool.function_to_key.iter().any(|kv| kv.key().0 == *id);
if !still_used_by_other_route {
self.function_timeouts
.lock()
.expect("function_timeouts")
.remove(id);
if let Some(handle) = self.pool.handles.get(&pool_key) {
self.provider
.unregister(&handle, id)
.await
.map_err(|e| FunctionInvocationError::Transport(e.to_string()))?;
}
let still_used = self
.pool
.function_to_key
.iter()
.any(|kv| kv.value() == &pool_key);
if !still_used && let Some((_, handle)) = self.pool.handles.remove(&pool_key) {
handle.cancel.cancel();
self.provider
.shutdown(handle)
.await
.map_err(|e| FunctionInvocationError::Transport(e.to_string()))?;
}
}
}
Ok(())
}
async fn invoke(
&self,
id: &FunctionId,
exchange: &Exchange,
) -> Result<ExchangePatch, FunctionInvocationError> {
let key = self
.pool
.function_to_key
.iter()
.find(|kv| kv.key().0 == *id)
.map(|kv| kv.value().clone())
.ok_or_else(|| FunctionInvocationError::NotRegistered {
function_id: id.clone(),
})?;
let handle = self
.pool
.handles
.get(&key)
.map(|h| h.clone())
.ok_or_else(|| FunctionInvocationError::RunnerUnavailable {
reason: "missing handle".into(),
})?;
let state = handle.state.lock().expect("state").clone();
match state {
RunnerState::Failed { reason } => {
return Err(FunctionInvocationError::RunnerUnavailable { reason });
}
RunnerState::Unhealthy { reason, .. } => {
return Err(FunctionInvocationError::RunnerUnavailable { reason });
}
_ => {}
}
let timeout = std::time::Duration::from_millis(
self.function_timeouts
.lock()
.expect("function_timeouts")
.get(id)
.copied()
.unwrap_or(self.config.default_timeout_ms),
);
self.provider
.invoke(&handle, id, exchange, timeout)
.await
.map_err(|e| FunctionInvocationError::Transport(e.to_string()))
}
async fn prepare_reload(
&self,
diff: FunctionDiff,
generation: u64,
) -> Result<PrepareToken, FunctionInvocationError> {
let current_gen = self.current_generation.load(Ordering::SeqCst);
if generation != current_gen {
self.discard_staging(generation);
return Err(FunctionInvocationError::Transport(format!(
"stale generation: expected {}, got {}",
current_gen, generation
)));
}
{
let mut staging = self.staging.lock().expect("staging");
let before = staging.len();
staging.retain(|g, _| *g >= current_gen);
let purged = before - staging.len();
if purged > 0 {
tracing::info!(purged, "hot-reload: purged stale staging buffers");
}
}
let mut token = PrepareToken::default();
for (def, route_id) in &diff.added {
match self.register(def.clone(), route_id.as_deref()).await {
Ok(()) => {
token.registered.push((def.clone(), route_id.clone()));
}
Err(e) => {
for (reg_def, reg_rid) in &token.registered {
if let Err(unreg_err) =
self.unregister(®_def.id, reg_rid.as_deref()).await
{
tracing::warn!(
function_id = %reg_def.id,
error = %unreg_err,
"hot-reload: failed to unregister during prepare rollback"
);
}
}
self.staging.lock().expect("staging").remove(&generation);
return Err(e);
}
}
}
Ok(token)
}
async fn finalize_reload(
&self,
diff: &FunctionDiff,
generation: u64,
) -> Result<(), FunctionInvocationError> {
for (id, route_id) in &diff.removed {
self.unregister(id, route_id.as_deref()).await?;
}
self.staging.lock().expect("staging").remove(&generation);
Ok(())
}
async fn rollback_reload(
&self,
token: PrepareToken,
generation: u64,
) -> Result<(), FunctionInvocationError> {
for (def, route_id) in &token.registered {
if let Err(e) = self.unregister(&def.id, route_id.as_deref()).await {
tracing::warn!(
function_id = %def.id,
error = %e,
"hot-reload: failed to unregister during rollback"
);
}
}
self.staging.lock().expect("staging").remove(&generation);
Ok(())
}
async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
let entries = self.staging.lock().expect("staging").remove(&0);
if let Some(entries) = entries {
let mut committed: Vec<(FunctionId, Option<String>)> = Vec::new();
for (def, route_id) in entries {
match self.register(def.clone(), route_id.as_deref()).await {
Ok(()) => committed.push((def.id, route_id)),
Err(e) => {
for (id, rid) in committed.iter().rev() {
let _ = self.unregister(id, rid.as_deref()).await;
}
return Err(e);
}
}
}
}
Ok(())
}
}