use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use dynamo_llm::local_model::LocalModel;
use dynamo_llm::local_model::LocalModelBuilder;
use dynamo_llm::local_model::runtime_config::{
DisaggregatedEndpoint, ModelRuntimeConfig, StructuralTagMode, StructuralTagSchemaMode,
StructuralTagScope,
};
use dynamo_llm::model_type::{ModelInput, ModelType};
use dynamo_llm::worker_type::WorkerType;
use dynamo_runtime::engine_routes::EngineRouteCallback;
use dynamo_runtime::pipeline::network::Ingress;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::{DistributedRuntime, Runtime};
use tokio_util::sync::CancellationToken;
use crate::adapter::EngineAdapter;
use crate::disagg::DisaggregationMode;
use crate::engine::{EngineConfig, LLMEngine};
use crate::error::{BackendError, DynamoError, ErrorType};
use crate::publisher::{PublisherHandles, setup_publishers};
const DEFAULT_GRACE_PERIOD_SECS: f64 = 5.0;
const GRACE_PERIOD_ENV: &str = "DYN_GRACEFUL_SHUTDOWN_GRACE_PERIOD_SECS";
const HEALTH_CHECK_PAYLOAD_ENV: &str = "DYN_HEALTH_CHECK_PAYLOAD";
#[derive(Clone, Debug, Default)]
pub struct RuntimeConfig {
pub discovery_backend: Option<String>,
pub request_plane: Option<String>,
pub event_plane: Option<String>,
}
impl RuntimeConfig {
pub fn has_overrides(&self) -> bool {
self.discovery_backend.is_some()
|| self.request_plane.is_some()
|| self.event_plane.is_some()
}
pub fn apply_to_env(&self) {
unsafe {
if let Some(ref v) = self.discovery_backend {
std::env::set_var("DYN_DISCOVERY_BACKEND", v);
}
if let Some(ref v) = self.request_plane {
std::env::set_var("DYN_REQUEST_PLANE", v);
}
if let Some(ref v) = self.event_plane {
std::env::set_var("DYN_EVENT_PLANE", v);
}
}
}
}
#[derive(Clone, Debug)]
pub struct WorkerConfig {
pub namespace: String,
pub component: String,
pub endpoint: String,
pub model_name: String,
pub served_model_name: Option<String>,
pub model_input: ModelInput,
pub endpoint_types: String,
pub custom_jinja_template: Option<PathBuf>,
pub tool_call_parser: Option<String>,
pub reasoning_parser: Option<String>,
pub exclude_tools_when_tool_choice_none: bool,
pub enable_local_indexer: bool,
pub enable_kv_routing: bool,
pub metrics_labels: Vec<(String, String)>,
pub disaggregation_mode: DisaggregationMode,
pub health_check_payload: Option<serde_json::Value>,
pub structural_tag_mode: StructuralTagMode,
pub structural_tag_scope: StructuralTagScope,
pub structural_tag_schema: StructuralTagSchemaMode,
pub runtime: RuntimeConfig,
}
impl WorkerConfig {
pub(crate) fn effective_enable_local_indexer(&self) -> bool {
self.enable_local_indexer && !self.disaggregation_mode.is_decode()
}
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
namespace: "dynamo".to_string(),
component: "backend".to_string(),
endpoint: "generate".to_string(),
model_name: String::new(),
served_model_name: None,
model_input: ModelInput::Tokens,
endpoint_types: "chat,completions".to_string(),
custom_jinja_template: None,
tool_call_parser: None,
reasoning_parser: None,
exclude_tools_when_tool_choice_none: true,
enable_local_indexer: true,
enable_kv_routing: true,
metrics_labels: Vec::new(),
disaggregation_mode: DisaggregationMode::Aggregated,
health_check_payload: None,
structural_tag_mode: StructuralTagMode::Off,
structural_tag_scope: StructuralTagScope::Auto,
structural_tag_schema: StructuralTagSchemaMode::Auto,
runtime: RuntimeConfig::default(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum LifecycleState {
Init,
Running,
StartFailed,
Stopped,
}
pub struct Worker {
engine: Arc<dyn LLMEngine>,
config: WorkerConfig,
state: LifecycleState,
publishers: Option<PublisherHandles>,
lifecycle: Option<crate::metrics::LifecycleGauges>,
}
impl Worker {
pub fn new(engine: Arc<dyn LLMEngine>, config: WorkerConfig) -> Self {
Self {
engine,
config,
state: LifecycleState::Init,
publishers: None,
lifecycle: None,
}
}
pub async fn run(mut self, runtime: Runtime) -> Result<(), DynamoError> {
validate_model_input(self.config.model_input)?;
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("install SIGTERM handler: {e}"),
)
})?;
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("install SIGINT handler: {e}"),
)
})?;
let shutdown_token = CancellationToken::new();
let signal_token = shutdown_token.clone();
let signal_handle = tokio::spawn(async move {
tokio::select! {
_ = sigterm.recv() => tracing::info!("SIGTERM received"),
_ = sigint.recv() => tracing::info!("SIGINT received"),
}
signal_token.cancel();
});
let outcome = {
let inner_fut = self.run_inner(runtime, &shutdown_token);
tokio::pin!(inner_fut);
tokio::select! {
result = &mut inner_fut => result,
_ = shutdown_token.cancelled() => {
let timeout = graceful_shutdown_timeout();
let grace = grace_period_secs();
let deadline = shutdown_deadline(timeout, grace);
tracing::debug!(
"graceful shutdown started; deadline {}s ({}s timeout + {:.2}s grace)",
deadline.as_secs(),
timeout.as_secs(),
grace,
);
match tokio::time::timeout(deadline, &mut inner_fut).await {
Ok(result) => result,
Err(_) => {
tracing::error!(
"Graceful shutdown exceeded {}s; force-exiting with code 911. \
Set DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to override.",
deadline.as_secs()
);
std::process::exit(911);
}
}
}
}
};
signal_handle.abort();
let _ = signal_handle.await;
self.cleanup_once().await;
outcome
}
async fn run_inner(
&mut self,
runtime: Runtime,
shutdown: &CancellationToken,
) -> Result<(), DynamoError> {
let drt = DistributedRuntime::from_settings(runtime)
.await
.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("distributed runtime: {e}"),
)
})?;
tracing::debug!("distributed runtime connected");
let component = drt
.namespace(&self.config.namespace)
.and_then(|ns| ns.component(&self.config.component))
.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("component: {e}"),
)
})?;
let endpoint = component.endpoint(&self.config.endpoint);
tracing::debug!(
namespace = %self.config.namespace,
component = %self.config.component,
endpoint = %self.config.endpoint,
"component and endpoint resolved"
);
if shutdown.is_cancelled() {
tracing::info!("Shutdown signal observed before engine.start(); exiting cleanly");
return Ok(());
}
let worker_id = drt.connection_id();
let engine_start = std::time::Instant::now();
let engine_config = self.start_engine(worker_id).await?;
let model_load_time_seconds = engine_start.elapsed().as_secs_f64();
tracing::debug!(
model = %engine_config.model,
worker_id,
model_load_time_seconds,
"engine.start() complete"
);
let engine_metrics =
crate::metrics::EngineMetrics::with_engine_config(endpoint.clone(), &engine_config);
let lifecycle =
crate::metrics::LifecycleGauges::new(&engine_metrics, model_load_time_seconds)?;
self.setup_publishing(
&component,
&engine_config,
&engine_metrics,
model_load_time_seconds,
lifecycle,
)
.await?;
if shutdown.is_cancelled() {
tracing::info!("Shutdown signal observed during engine.start(); running orchestrator");
self.orchestrator_steps(&endpoint).await;
return Ok(());
}
self.serve_with_orchestrator(&engine_config, endpoint, shutdown.clone())
.await
}
async fn setup_publishing(
&mut self,
component: &dynamo_runtime::component::Component,
engine_config: &EngineConfig,
engine_metrics: &crate::metrics::EngineMetrics,
model_load_time_seconds: f64,
lifecycle: crate::metrics::LifecycleGauges,
) -> Result<(), DynamoError> {
let ctx = crate::engine::MetricsCtx {
model: &engine_config.model,
component: &self.config.component,
model_load_time_seconds,
metrics: engine_metrics,
};
let bindings = self.engine.setup_metrics(ctx).await?;
if !self.config.enable_kv_routing {
tracing::debug!("enable_kv_routing=false; skipping kv/snapshot publishers");
self.lifecycle = Some(lifecycle);
return Ok(());
}
let kv_sources = self.engine.kv_event_sources().await?;
if kv_sources.is_empty() && bindings.dp_ranks.is_empty() {
tracing::debug!("engine returned no KV sources / dp_ranks; KV-aware routing disabled");
self.lifecycle = Some(lifecycle);
return Ok(());
}
let enable_local_indexer = self.config.effective_enable_local_indexer();
tracing::debug!(
kv_sources = kv_sources.len(),
snapshot_dp_ranks = bindings.dp_ranks.len(),
enable_local_indexer,
kv_cache_block_size = ?engine_config.kv_cache_block_size,
"Starting KV-aware-routing publishers"
);
let handles = setup_publishers(
component,
engine_metrics,
kv_sources,
bindings.dp_ranks,
bindings.on_publisher_ready,
engine_config.kv_cache_block_size,
enable_local_indexer,
)
.await?;
self.publishers = Some(handles);
self.lifecycle = Some(lifecycle);
Ok(())
}
async fn register_engine_controls(
&self,
endpoint: &dynamo_runtime::component::Endpoint,
) -> Result<(), DynamoError> {
let controls = self.engine.supported_controls().await?;
if controls.is_empty() {
tracing::debug!("engine returned no management controls");
return Ok(());
}
let registry = endpoint.drt().engine_routes();
let control_count = controls.len();
let control_lock = Arc::new(tokio::sync::Mutex::new(()));
for control_name in controls {
let callback = engine_control_callback(control_name.clone(), self.engine.clone());
let callback = wrap_engine_control_callback(
control_name.clone(),
callback,
endpoint.clone(),
control_lock.clone(),
);
registry.register(&control_name, callback);
}
tracing::info!(control_count, "registered engine management controls");
Ok(())
}
async fn orchestrator_steps(&mut self, endpoint: &dynamo_runtime::component::Endpoint) {
if let Err(e) = endpoint.unregister_endpoint_instance().await {
tracing::warn!(error = %e, "discovery unregister failed");
} else {
tracing::info!("Endpoint unregistered from discovery");
}
self.run_engine_shutdown_steps().await;
}
async fn start_engine(&mut self, worker_id: u64) -> Result<EngineConfig, DynamoError> {
assert_eq!(
self.state,
LifecycleState::Init,
"start_engine called in unexpected state {:?}",
self.state
);
match self.engine.start(worker_id).await {
Ok(cfg) => {
self.state = LifecycleState::Running;
Ok(cfg)
}
Err(e) => {
self.state = LifecycleState::StartFailed;
Err(e)
}
}
}
async fn cleanup_once(&mut self) {
match self.state {
LifecycleState::Init | LifecycleState::Stopped => {
self.state = LifecycleState::Stopped;
return;
}
LifecycleState::Running | LifecycleState::StartFailed => {}
}
let cleanup_start = std::time::Instant::now();
match self.engine.cleanup().await {
Ok(()) => tracing::info!("Engine cleanup complete"),
Err(e) => tracing::error!(error = %e, "engine cleanup failed"),
}
let cleanup_elapsed = cleanup_start.elapsed().as_secs_f64();
if let Some(lifecycle) = self.lifecycle.as_ref() {
lifecycle.observe_cleanup_time(cleanup_elapsed);
}
self.publishers = None;
self.state = LifecycleState::Stopped;
}
async fn serve_with_orchestrator(
&mut self,
engine_config: &EngineConfig,
endpoint: dynamo_runtime::component::Endpoint,
shutdown: CancellationToken,
) -> Result<(), DynamoError> {
let model_type = resolve_model_type(&self.config)?;
let (worker_type, needs) = resolve_worker_type_and_needs(&self.config);
let mut local_model = build_local_model(&self.config, engine_config).await?;
tracing::debug!("local model built");
local_model
.attach(
&endpoint,
model_type,
self.config.model_input,
None,
Some(worker_type),
needs,
)
.await
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("model attach: {e}"),
)
})?;
tracing::debug!("model registered with discovery");
self.register_engine_controls(&endpoint).await?;
let served = resolve_served_name(&self.config, engine_config)
.unwrap_or_else(|| engine_config.model.clone());
tracing::info!(
"Serving {} on {}.{}.{}",
served,
self.config.namespace,
self.config.component,
self.config.endpoint
);
let engine_adapter = Arc::new(EngineAdapter::new(
self.engine.clone(),
self.config.disaggregation_mode,
));
let ingress = Ingress::for_engine(engine_adapter.clone()).map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("ingress: {e}"),
)
})?;
let metrics_labels = if self.config.metrics_labels.is_empty() {
None
} else {
Some(self.config.metrics_labels.clone())
};
let _orchestrator_registration = endpoint.drt().register_graceful_task();
let probe = match std::mem::take(&mut self.config.health_check_payload)
.or_else(load_health_check_payload_from_env)
{
Some(p) => stamp_canary_marker(p),
None => self
.engine
.health_check_payload()
.await
.unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"engine.health_check_payload() failed; canary disabled for this endpoint",
);
None
})
.and_then(stamp_canary_marker),
};
let mut builder = endpoint
.endpoint_builder()
.handler(ingress)
.metrics_labels(metrics_labels)
.graceful_shutdown(true);
if let Some(payload) = probe {
builder = builder.health_check_payload(payload);
builder = builder
.register_local_engine(Arc::new(crate::adapter::JsonProbeAdapter::new(
engine_adapter,
)))
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("register_local_engine: {e}"),
)
})?;
}
let serve_fut = builder.start();
tokio::pin!(serve_fut);
tokio::select! {
biased;
result = &mut serve_fut => {
match result {
Ok(()) => {
tracing::info!(
"Endpoint completed gracefully; running shutdown orchestration"
);
}
Err(e) => {
return Err(err(
ErrorType::Backend(BackendError::Unknown),
format!("serve: {e}"),
));
}
}
}
_ = shutdown.cancelled() => {
tracing::info!("Received shutdown signal; running graceful orchestration");
}
}
self.orchestrator_steps(&endpoint).await;
Ok(())
}
async fn run_engine_shutdown_steps(&mut self) {
self.run_engine_shutdown_steps_with_grace(grace_period_secs())
.await
}
async fn run_engine_shutdown_steps_with_grace(&mut self, grace: f64) {
if grace > 0.0 {
tracing::info!("Grace period {:.2}s before drain", grace);
tokio::time::sleep(Duration::from_secs_f64(grace)).await;
}
let drain_start = std::time::Instant::now();
if let Err(e) = self.engine.drain().await {
tracing::warn!(error = %e, "engine drain failed");
}
let drain_elapsed = drain_start.elapsed().as_secs_f64();
if let Some(lifecycle) = self.lifecycle.as_ref() {
lifecycle.observe_drain_time(drain_elapsed);
}
self.cleanup_once().await;
}
}
fn graceful_shutdown_timeout() -> Duration {
use dynamo_runtime::config::environment_names::worker as env_worker;
use dynamo_runtime::worker::{
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG, DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE,
};
let default = if cfg!(debug_assertions) {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
} else {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
};
let secs = std::env::var(env_worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(default);
Duration::from_secs(secs)
}
fn shutdown_deadline(timeout: Duration, grace_secs: f64) -> Duration {
let grace = if grace_secs > 0.0 {
Duration::from_secs_f64(grace_secs)
} else {
Duration::ZERO
};
timeout.saturating_add(grace)
}
fn stamp_canary_marker(mut value: serde_json::Value) -> Option<serde_json::Value> {
let Some(obj) = value.as_object_mut() else {
tracing::warn!(
?value,
"health_check_payload override is not a JSON object; canary disabled"
);
return None;
};
obj.insert(
crate::engine::HEALTH_CHECK_KEY.to_string(),
serde_json::Value::Bool(true),
);
Some(value)
}
fn load_health_check_payload_from_env() -> Option<serde_json::Value> {
let raw = std::env::var(HEALTH_CHECK_PAYLOAD_ENV)
.ok()
.filter(|s| !s.is_empty())?;
let parsed: Result<serde_json::Value, _> = if let Some(path) = raw.strip_prefix('@') {
std::fs::read_to_string(path).map_or_else(
|e| Err(format!("read {path}: {e}")),
|s| serde_json::from_str(&s).map_err(|e| e.to_string()),
)
} else {
serde_json::from_str(&raw).map_err(|e| e.to_string())
};
match parsed {
Ok(v) if v.is_object() => Some(v),
Ok(_) => {
tracing::warn!(
env = HEALTH_CHECK_PAYLOAD_ENV,
"value must be a JSON object"
);
None
}
Err(e) => {
tracing::warn!(env = HEALTH_CHECK_PAYLOAD_ENV, error = %e, "parse failed");
None
}
}
}
fn grace_period_secs() -> f64 {
match std::env::var(GRACE_PERIOD_ENV) {
Ok(s) if !s.is_empty() => match s.parse::<f64>() {
Ok(v) if v >= 0.0 => v,
Ok(_) => 0.0,
Err(_) => {
tracing::warn!(
"Invalid {}={:?}; using default {}",
GRACE_PERIOD_ENV,
s,
DEFAULT_GRACE_PERIOD_SECS
);
DEFAULT_GRACE_PERIOD_SECS
}
},
_ => DEFAULT_GRACE_PERIOD_SECS,
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum EngineControlPolicy {
Direct,
UnregisterBefore,
RegisterAfter,
}
fn engine_control_policy(control: &str) -> EngineControlPolicy {
match control {
"sleep" | "release_memory_occupation" => EngineControlPolicy::UnregisterBefore,
"wake_up" | "resume_memory_occupation" => EngineControlPolicy::RegisterAfter,
_ => EngineControlPolicy::Direct,
}
}
fn control_response_is_error(value: &serde_json::Value) -> bool {
value
.get("status")
.and_then(|v| v.as_str())
.is_some_and(|status| status.eq_ignore_ascii_case("error"))
|| value
.get("success")
.and_then(|v| v.as_bool())
.is_some_and(|success| !success)
}
fn control_error_response(message: impl Into<String>) -> serde_json::Value {
serde_json::json!({"status": "error", "message": message.into()})
}
fn control_request_body_error(body: &serde_json::Value) -> Option<serde_json::Value> {
if body.is_object() {
None
} else {
Some(control_error_response(
"engine control request body must be a JSON object",
))
}
}
fn engine_control_callback(
control_name: String,
engine: Arc<dyn LLMEngine>,
) -> EngineRouteCallback {
Arc::new(move |body| {
let engine = engine.clone();
let control_name = control_name.clone();
Box::pin(async move {
engine
.engine_control(control_name, body)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))
})
})
}
fn wrap_engine_control_callback(
control_name: String,
callback: EngineRouteCallback,
endpoint: dynamo_runtime::component::Endpoint,
control_lock: Arc<tokio::sync::Mutex<()>>,
) -> EngineRouteCallback {
let policy = engine_control_policy(&control_name);
Arc::new(move |body| {
let callback = callback.clone();
let endpoint = endpoint.clone();
let control_name = control_name.clone();
let control_lock = control_lock.clone();
Box::pin(async move {
match policy {
EngineControlPolicy::Direct => callback(body).await,
EngineControlPolicy::UnregisterBefore => {
if let Some(response) = control_request_body_error(&body) {
return Ok(response);
}
let _guard = control_lock.lock().await;
if let Err(e) = endpoint.unregister_endpoint_instance().await {
return Ok(control_error_response(format!(
"failed to unregister endpoint before /engine/{control_name}: {e}"
)));
}
match callback(body).await {
Ok(response) => {
if control_response_is_error(&response) {
tracing::warn!(
control = %control_name,
"engine control returned an error after endpoint unregister; leaving endpoint unregistered"
);
}
Ok(response)
}
Err(e) => {
tracing::warn!(
control = %control_name,
error = %e,
"engine control callback failed after endpoint unregister; leaving endpoint unregistered"
);
Err(e)
}
}
}
EngineControlPolicy::RegisterAfter => {
let _guard = control_lock.lock().await;
let response = callback(body).await?;
if !control_response_is_error(&response)
&& let Err(e) = endpoint.register_endpoint_instance().await
{
return Ok(control_error_response(format!(
"engine resumed but re-registration failed after /engine/{control_name}: {e}; retry /engine/{control_name} to rejoin discovery"
)));
}
Ok(response)
}
}
})
})
}
fn err(error_type: ErrorType, message: impl Into<String>) -> DynamoError {
DynamoError::builder()
.error_type(error_type)
.message(message)
.build()
}
fn resolve_served_name(config: &WorkerConfig, engine_config: &EngineConfig) -> Option<String> {
config
.served_model_name
.clone()
.or_else(|| engine_config.served_model_name.clone())
}
fn resolve_model_type(config: &WorkerConfig) -> Result<ModelType, DynamoError> {
if config.disaggregation_mode.is_prefill() {
return Ok(ModelType::Prefill);
}
parse_endpoint_types(&config.endpoint_types)
}
fn resolve_worker_type_and_needs(config: &WorkerConfig) -> (WorkerType, Vec<Vec<WorkerType>>) {
match config.disaggregation_mode {
DisaggregationMode::Prefill => (WorkerType::Prefill, vec![vec![WorkerType::Decode]]),
DisaggregationMode::Decode => (WorkerType::Decode, vec![vec![WorkerType::Prefill]]),
DisaggregationMode::Aggregated => (WorkerType::Aggregated, Vec::new()),
}
}
fn parse_endpoint_types(s: &str) -> Result<ModelType, DynamoError> {
let mut out = ModelType::empty();
let mut any = false;
for raw in s.split(',') {
let t = raw.trim().to_ascii_lowercase();
if t.is_empty() {
continue;
}
let flag = match t.as_str() {
"chat" => ModelType::Chat,
"completions" => ModelType::Completions,
"embedding" | "embeddings" => ModelType::Embedding,
"tensor" => ModelType::TensorBased,
"prefill" => ModelType::Prefill,
other => {
return Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
format!("unknown endpoint type '{other}'"),
));
}
};
out |= flag;
any = true;
}
if !any {
return Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
"endpoint_types cannot be empty",
));
}
Ok(out)
}
fn validate_model_input(model_input: ModelInput) -> Result<(), DynamoError> {
if model_input == ModelInput::Tokens {
return Ok(());
}
Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
format!(
"dynamo_backend_common::Worker currently supports only ModelInput::Tokens; got '{}'. \
ModelInput::Text and ModelInput::Tensor require dedicated raw-request adapters.",
model_input.as_str()
),
))
}
async fn build_local_model(
config: &WorkerConfig,
engine_config: &EngineConfig,
) -> Result<LocalModel, DynamoError> {
let served_name = resolve_served_name(config, engine_config)
.or_else(|| Some(engine_config.model.clone()))
.filter(|s| !s.is_empty());
let enable_local_indexer = config.effective_enable_local_indexer();
let disaggregated_endpoint = match (&engine_config.bootstrap_host, engine_config.bootstrap_port)
{
(Some(host), Some(port)) => {
tracing::info!(
bootstrap_host = %host,
bootstrap_port = port,
"Publishing disaggregated_endpoint for prefill worker"
);
Some(DisaggregatedEndpoint {
bootstrap_host: Some(host.clone()),
bootstrap_port: Some(port),
})
}
_ => None,
};
let rt_cfg = ModelRuntimeConfig {
total_kv_blocks: engine_config.total_kv_blocks,
max_num_seqs: engine_config.max_num_seqs,
max_num_batched_tokens: engine_config.max_num_batched_tokens,
data_parallel_size: engine_config.data_parallel_size.unwrap_or(1),
data_parallel_start_rank: engine_config.data_parallel_start_rank.unwrap_or(0),
tool_call_parser: config.tool_call_parser.clone(),
reasoning_parser: config.reasoning_parser.clone(),
exclude_tools_when_tool_choice_none: config.exclude_tools_when_tool_choice_none,
structural_tag_mode: config.structural_tag_mode,
structural_tag_scope: config.structural_tag_scope,
structural_tag_schema: config.structural_tag_schema,
enable_local_indexer,
disaggregated_endpoint,
runtime_data: engine_config.runtime_data.clone(),
..ModelRuntimeConfig::default()
};
let mut builder = LocalModelBuilder::default();
builder
.model_name(served_name)
.context_length(engine_config.context_length)
.kv_cache_block_size(engine_config.kv_cache_block_size)
.custom_template_path(config.custom_jinja_template.clone())
.runtime_config(rt_cfg);
if !config.model_name.is_empty() {
let source = config.model_name.clone();
let local_path = if std::fs::exists(&source).map_err(|e| {
err(
ErrorType::Backend(BackendError::InvalidArgument),
format!("model path: {e}"),
)
})? {
PathBuf::from(&source)
} else {
LocalModel::fetch(&source, false).await.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("fetch '{source}': {e}"),
)
})?
};
builder.model_path(local_path);
builder.source_path(PathBuf::from(source));
}
builder.build().await.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("build local model: {e}"),
)
})
}
#[cfg(test)]
mod tests {
use super::*;
fn error_type_of(result: Result<ModelType, DynamoError>) -> ErrorType {
result.unwrap_err().error_type()
}
#[test]
fn parse_endpoint_types_happy_path() {
let got = parse_endpoint_types("chat,completions").unwrap();
assert_eq!(got, ModelType::Chat | ModelType::Completions);
}
#[test]
fn parse_endpoint_types_single() {
assert_eq!(parse_endpoint_types("chat").unwrap(), ModelType::Chat);
assert_eq!(
parse_endpoint_types("completions").unwrap(),
ModelType::Completions
);
assert_eq!(
parse_endpoint_types("embedding").unwrap(),
ModelType::Embedding
);
}
#[test]
fn parse_endpoint_types_trims_and_lowercases() {
let got = parse_endpoint_types(" Chat , COMPLETIONS ").unwrap();
assert_eq!(got, ModelType::Chat | ModelType::Completions);
}
#[test]
fn parse_endpoint_types_rejects_empty() {
assert_eq!(
error_type_of(parse_endpoint_types("")),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert_eq!(
error_type_of(parse_endpoint_types(" , ")),
ErrorType::Backend(BackendError::InvalidArgument)
);
}
#[test]
fn parse_endpoint_types_rejects_unknown() {
let e = parse_endpoint_types("chat,bogus").unwrap_err();
assert_eq!(
e.error_type(),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert!(e.to_string().contains("bogus"));
}
#[test]
fn engine_control_policy_wraps_discovery_mutating_controls() {
assert_eq!(
engine_control_policy("start_profile"),
EngineControlPolicy::Direct
);
assert_eq!(
engine_control_policy("stop_profile"),
EngineControlPolicy::Direct
);
assert_eq!(
engine_control_policy("update_weights_from_disk"),
EngineControlPolicy::Direct
);
assert_eq!(
engine_control_policy("sleep"),
EngineControlPolicy::UnregisterBefore
);
assert_eq!(
engine_control_policy("release_memory_occupation"),
EngineControlPolicy::UnregisterBefore
);
assert_eq!(
engine_control_policy("wake_up"),
EngineControlPolicy::RegisterAfter
);
assert_eq!(
engine_control_policy("resume_memory_occupation"),
EngineControlPolicy::RegisterAfter
);
}
#[test]
fn control_request_body_validation_requires_json_object() {
assert!(control_request_body_error(&serde_json::json!({})).is_none());
assert!(control_request_body_error(&serde_json::json!({"tags": ["kv_cache"]})).is_none());
for body in [
serde_json::json!(null),
serde_json::json!(true),
serde_json::json!("bad"),
serde_json::json!(["kv_cache"]),
] {
let response = control_request_body_error(&body).unwrap();
assert!(control_response_is_error(&response));
assert_eq!(
response.get("message").and_then(|value| value.as_str()),
Some("engine control request body must be a JSON object")
);
}
}
#[test]
fn control_response_error_detection_matches_backend_conventions() {
assert!(control_response_is_error(&serde_json::json!({
"status": "error"
})));
assert!(control_response_is_error(&serde_json::json!({
"status": "ERROR"
})));
assert!(control_response_is_error(&serde_json::json!({
"success": false
})));
assert!(!control_response_is_error(&serde_json::json!({
"status": "ok"
})));
assert!(!control_response_is_error(&serde_json::json!({
"success": true
})));
assert!(!control_response_is_error(&serde_json::json!({
"message": "ok"
})));
}
#[test]
fn validate_model_input_accepts_tokens() {
validate_model_input(ModelInput::Tokens).unwrap();
}
#[test]
fn validate_model_input_rejects_text_and_tensor() {
for input in [ModelInput::Text, ModelInput::Tensor] {
let e = validate_model_input(input).unwrap_err();
assert_eq!(
e.error_type(),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert!(e.to_string().contains(input.as_str()));
}
}
#[tokio::test]
async fn build_local_model_carries_runtime_parser_settings() {
let config = WorkerConfig {
tool_call_parser: Some("kimi_k2".to_string()),
reasoning_parser: Some("kimi_k25".to_string()),
exclude_tools_when_tool_choice_none: false,
enable_local_indexer: false,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "nvidia/Kimi-K2.5-NVFP4".to_string(),
total_kv_blocks: Some(100),
max_num_seqs: Some(16),
max_num_batched_tokens: Some(8192),
runtime_data: [(
"sglang_worker_group_id".to_string(),
serde_json::json!("group-a"),
)]
.into(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
let runtime_config = local_model.runtime_config();
assert_eq!(runtime_config.total_kv_blocks, Some(100));
assert_eq!(runtime_config.max_num_seqs, Some(16));
assert_eq!(runtime_config.max_num_batched_tokens, Some(8192));
assert_eq!(runtime_config.tool_call_parser.as_deref(), Some("kimi_k2"));
assert_eq!(runtime_config.reasoning_parser.as_deref(), Some("kimi_k25"));
assert!(!runtime_config.exclude_tools_when_tool_choice_none);
assert!(!runtime_config.enable_local_indexer);
assert_eq!(
runtime_config
.runtime_data
.get("sglang_worker_group_id")
.and_then(|value| value.as_str()),
Some("group-a")
);
}
#[test]
fn resolve_model_type_aggregated_uses_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat,completions".to_string(),
disaggregation_mode: DisaggregationMode::Aggregated,
..WorkerConfig::default()
};
assert_eq!(
resolve_model_type(&config).unwrap(),
ModelType::Chat | ModelType::Completions,
);
}
#[test]
fn resolve_model_type_decode_uses_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat".to_string(),
disaggregation_mode: DisaggregationMode::Decode,
..WorkerConfig::default()
};
assert_eq!(resolve_model_type(&config).unwrap(), ModelType::Chat);
}
#[test]
fn resolve_model_type_prefill_overrides_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat,completions".to_string(),
disaggregation_mode: DisaggregationMode::Prefill,
..WorkerConfig::default()
};
assert_eq!(resolve_model_type(&config).unwrap(), ModelType::Prefill);
}
#[tokio::test]
async fn build_local_model_decode_disables_local_indexer() {
let config = WorkerConfig {
enable_local_indexer: true,
disaggregation_mode: DisaggregationMode::Decode,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(!local_model.runtime_config().enable_local_indexer);
}
#[tokio::test]
async fn build_local_model_aggregated_keeps_local_indexer() {
let config = WorkerConfig {
enable_local_indexer: true,
disaggregation_mode: DisaggregationMode::Aggregated,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(local_model.runtime_config().enable_local_indexer);
}
#[tokio::test]
async fn build_local_model_publishes_disaggregated_endpoint_when_engine_provides_it() {
let config = WorkerConfig {
disaggregation_mode: DisaggregationMode::Prefill,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
bootstrap_host: Some("10.0.0.5".to_string()),
bootstrap_port: Some(12345),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
let endpoint = local_model
.runtime_config()
.disaggregated_endpoint
.as_ref()
.expect("disaggregated_endpoint must be published");
assert_eq!(endpoint.bootstrap_host.as_deref(), Some("10.0.0.5"));
assert_eq!(endpoint.bootstrap_port, Some(12345));
}
#[tokio::test]
async fn build_local_model_skips_disaggregated_endpoint_when_engine_omits_it() {
let config = WorkerConfig::default();
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(
local_model
.runtime_config()
.disaggregated_endpoint
.is_none()
);
}
use crate::engine::PreprocessedRequest;
use async_trait::async_trait;
use futures::stream::BoxStream;
use std::sync::atomic::{AtomicUsize, Ordering};
struct StateMockEngine {
start_should_fail: bool,
cleanup_calls: Arc<AtomicUsize>,
}
impl StateMockEngine {
fn new(start_should_fail: bool) -> (Arc<Self>, Arc<AtomicUsize>) {
let cleanup_calls = Arc::new(AtomicUsize::new(0));
let eng = Arc::new(Self {
start_should_fail,
cleanup_calls: cleanup_calls.clone(),
});
(eng, cleanup_calls)
}
}
#[async_trait]
impl LLMEngine for StateMockEngine {
async fn start(&self, _worker_id: u64) -> Result<EngineConfig, DynamoError> {
if self.start_should_fail {
Err(err(
ErrorType::Backend(BackendError::EngineShutdown),
"synthetic start failure",
))
} else {
Ok(EngineConfig {
model: "mock".to_string(),
..EngineConfig::default()
})
}
}
async fn generate(
&self,
_request: PreprocessedRequest,
_ctx: crate::engine::GenerateContext,
) -> Result<
BoxStream<'static, Result<crate::engine::LLMEngineOutput, DynamoError>>,
DynamoError,
> {
unreachable!("not used in state machine tests")
}
async fn cleanup(&self) -> Result<(), DynamoError> {
self.cleanup_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
fn worker_with(engine: Arc<dyn LLMEngine>) -> Worker {
Worker::new(engine, WorkerConfig::default())
}
#[tokio::test]
async fn start_engine_init_to_running_on_success() {
let (engine, _) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
let cfg = worker.start_engine(0).await.expect("start");
assert_eq!(cfg.model, "mock");
assert_eq!(worker.state, LifecycleState::Running);
}
#[tokio::test]
async fn start_engine_init_to_start_failed_on_failure() {
let (engine, _) = StateMockEngine::new(true);
let mut worker = worker_with(engine);
let res = worker.start_engine(0).await;
assert!(res.is_err(), "start should fail");
assert_eq!(worker.state, LifecycleState::StartFailed);
}
#[tokio::test]
async fn cleanup_once_runs_engine_cleanup_after_failed_start() {
let (engine, cleanup_calls) = StateMockEngine::new(true);
let mut worker = worker_with(engine);
let _ = worker.start_engine(0).await;
worker.cleanup_once().await;
assert_eq!(
cleanup_calls.load(Ordering::SeqCst),
1,
"engine.cleanup() must run exactly once after a failed start \
so engines don't have to re-implement the guard"
);
assert_eq!(worker.state, LifecycleState::Stopped);
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 1);
assert_eq!(worker.state, LifecycleState::Stopped);
}
#[tokio::test]
async fn cleanup_once_is_idempotent() {
let (engine, cleanup_calls) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.cleanup_once().await;
worker.cleanup_once().await;
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 1);
assert_eq!(worker.state, LifecycleState::Stopped);
}
#[tokio::test]
async fn cleanup_once_noops_when_never_started() {
let (engine, cleanup_calls) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 0);
assert_eq!(worker.state, LifecycleState::Stopped);
}
use std::sync::Mutex as StdMutex;
struct OrderingMockEngine {
log: Arc<StdMutex<Vec<&'static str>>>,
drain_should_fail: bool,
}
impl OrderingMockEngine {
fn new(drain_should_fail: bool) -> (Arc<Self>, Arc<StdMutex<Vec<&'static str>>>) {
let log = Arc::new(StdMutex::new(Vec::new()));
let eng = Arc::new(Self {
log: log.clone(),
drain_should_fail,
});
(eng, log)
}
}
#[async_trait]
impl LLMEngine for OrderingMockEngine {
async fn start(&self, _worker_id: u64) -> Result<EngineConfig, DynamoError> {
self.log.lock().unwrap().push("start");
Ok(EngineConfig {
model: "mock".to_string(),
..EngineConfig::default()
})
}
async fn generate(
&self,
_request: PreprocessedRequest,
_ctx: crate::engine::GenerateContext,
) -> Result<
BoxStream<'static, Result<crate::engine::LLMEngineOutput, DynamoError>>,
DynamoError,
> {
unreachable!("not used in orchestrator tests")
}
async fn drain(&self) -> Result<(), DynamoError> {
self.log.lock().unwrap().push("drain");
if self.drain_should_fail {
Err(err(
ErrorType::Backend(BackendError::Unknown),
"synthetic drain failure",
))
} else {
Ok(())
}
}
async fn cleanup(&self) -> Result<(), DynamoError> {
self.log.lock().unwrap().push("cleanup");
Ok(())
}
}
#[tokio::test]
async fn shutdown_steps_run_drain_before_cleanup() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.run_engine_shutdown_steps_with_grace(0.0).await;
let recorded = log.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["start", "drain", "cleanup"],
"drain must run before cleanup"
);
}
#[tokio::test]
async fn shutdown_steps_drain_failure_does_not_block_cleanup() {
let (engine, log) = OrderingMockEngine::new(true); let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.run_engine_shutdown_steps_with_grace(0.0).await;
let recorded = log.lock().unwrap().clone();
assert_eq!(recorded, vec!["start", "drain", "cleanup"]);
assert_eq!(worker.state, LifecycleState::Stopped);
}
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn with_env<F: FnOnce() -> R, R>(key: &str, value: Option<&str>, f: F) -> R {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let prev = std::env::var(key).ok();
unsafe {
match value {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
let out = f();
unsafe {
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
out
}
#[test]
fn grace_period_default_when_unset() {
with_env(GRACE_PERIOD_ENV, None, || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
#[test]
fn grace_period_parses_valid_value() {
with_env(GRACE_PERIOD_ENV, Some("2.5"), || {
assert_eq!(grace_period_secs(), 2.5);
});
}
#[test]
fn grace_period_clamps_negative_to_zero() {
with_env(GRACE_PERIOD_ENV, Some("-1"), || {
assert_eq!(grace_period_secs(), 0.0);
});
}
#[test]
fn grace_period_falls_back_to_default_on_parse_error() {
with_env(GRACE_PERIOD_ENV, Some("not-a-number"), || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
#[test]
fn grace_period_treats_empty_as_unset() {
with_env(GRACE_PERIOD_ENV, Some(""), || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
#[test]
fn health_check_payload_env_returns_object() {
with_env(
HEALTH_CHECK_PAYLOAD_ENV,
Some(r#"{"token_ids":[1]}"#),
|| {
let got = load_health_check_payload_from_env().unwrap();
assert_eq!(got["token_ids"], serde_json::json!([1]));
},
);
}
#[test]
fn health_check_payload_env_rejects_non_object() {
with_env(HEALTH_CHECK_PAYLOAD_ENV, Some("[1,2,3]"), || {
assert!(load_health_check_payload_from_env().is_none());
});
}
#[test]
fn stamp_canary_marker_injects_into_object() {
let stamped = stamp_canary_marker(serde_json::json!({"token_ids": [1]})).unwrap();
assert_eq!(
stamped[crate::engine::HEALTH_CHECK_KEY],
serde_json::json!(true)
);
assert_eq!(stamped["token_ids"], serde_json::json!([1]));
}
#[test]
fn stamp_canary_marker_rejects_non_object() {
assert!(stamp_canary_marker(serde_json::json!([1, 2, 3])).is_none());
assert!(stamp_canary_marker(serde_json::json!(42)).is_none());
}
#[test]
fn stamp_canary_marker_overrides_falsy_marker() {
let stamped =
stamp_canary_marker(serde_json::json!({crate::engine::HEALTH_CHECK_KEY: false}))
.unwrap();
assert_eq!(
stamped[crate::engine::HEALTH_CHECK_KEY],
serde_json::json!(true)
);
}
const SHUTDOWN_TIMEOUT_ENV: &str =
dynamo_runtime::config::environment_names::worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT;
fn expected_default_timeout_secs() -> u64 {
if cfg!(debug_assertions) {
dynamo_runtime::worker::DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
} else {
dynamo_runtime::worker::DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
}
}
#[test]
fn shutdown_timeout_default_when_unset() {
with_env(SHUTDOWN_TIMEOUT_ENV, None, || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_timeout_parses_valid_value() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some("42"), || {
assert_eq!(graceful_shutdown_timeout(), Duration::from_secs(42));
});
}
#[test]
fn shutdown_timeout_falls_back_to_default_on_parse_error() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some("not-a-number"), || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_timeout_treats_empty_as_unset() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some(""), || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_deadline_adds_grace_to_timeout() {
assert_eq!(
shutdown_deadline(Duration::from_secs(5), 5.0),
Duration::from_secs(10)
);
assert_eq!(
shutdown_deadline(Duration::from_secs(30), 0.0),
Duration::from_secs(30)
);
assert_eq!(
shutdown_deadline(Duration::from_secs(2), 0.5),
Duration::from_millis(2_500)
);
}
#[test]
fn shutdown_deadline_clamps_negative_grace() {
assert_eq!(
shutdown_deadline(Duration::from_secs(5), -1.0),
Duration::from_secs(5)
);
}
#[tokio::test(start_paused = true)]
async fn timeout_alone_starves_drain_cleanup_when_grace_meets_timeout() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
let timeout = Duration::from_secs(5);
let grace = 5.1;
let result =
tokio::time::timeout(timeout, worker.run_engine_shutdown_steps_with_grace(grace)).await;
assert!(
result.is_err(),
"buggy deadline must expire before drain/cleanup run"
);
let recorded = log.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["start"],
"drain and cleanup must not have been observed"
);
}
#[tokio::test(start_paused = true)]
async fn shutdown_deadline_reserves_grace_so_drain_cleanup_complete() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
let timeout = Duration::from_secs(5);
let grace = 5.1;
let deadline = shutdown_deadline(timeout, grace);
let result =
tokio::time::timeout(deadline, worker.run_engine_shutdown_steps_with_grace(grace))
.await;
assert!(
result.is_ok(),
"fixed deadline must allow drain + cleanup to finish"
);
let recorded = log.lock().unwrap().clone();
assert_eq!(recorded, vec!["start", "drain", "cleanup"]);
}
#[test]
fn runtime_config_apply_to_env_writes_set_fields() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let cfg = RuntimeConfig {
discovery_backend: Some("file".to_string()),
request_plane: Some("tcp".to_string()),
event_plane: Some("zmq".to_string()),
};
let prev: Vec<_> = [
"DYN_DISCOVERY_BACKEND",
"DYN_REQUEST_PLANE",
"DYN_EVENT_PLANE",
]
.iter()
.map(|k| (*k, std::env::var(k).ok()))
.collect();
cfg.apply_to_env();
assert_eq!(std::env::var("DYN_DISCOVERY_BACKEND").unwrap(), "file");
assert_eq!(std::env::var("DYN_REQUEST_PLANE").unwrap(), "tcp");
assert_eq!(std::env::var("DYN_EVENT_PLANE").unwrap(), "zmq");
for (k, v) in prev {
unsafe {
match v {
Some(val) => std::env::set_var(k, val),
None => std::env::remove_var(k),
}
}
}
}
#[test]
fn runtime_config_apply_to_env_leaves_unset_fields_untouched() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let key = "DYN_REQUEST_PLANE";
let prev = std::env::var(key).ok();
unsafe { std::env::set_var(key, "preexisting") };
let cfg = RuntimeConfig {
discovery_backend: Some("etcd".to_string()),
request_plane: None,
event_plane: None,
};
cfg.apply_to_env();
assert_eq!(std::env::var(key).unwrap(), "preexisting");
unsafe {
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
}
}