use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use dynamo_llm::local_model::LocalModel;
use dynamo_llm::local_model::LocalModelBuilder;
use dynamo_llm::local_model::runtime_config::{DisaggregatedEndpoint, ModelRuntimeConfig};
use dynamo_llm::model_type::{ModelInput, ModelType};
use dynamo_runtime::pipeline::network::Ingress;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::{DistributedRuntime, Runtime};
use tokio_util::sync::CancellationToken;
use crate::adapter::EngineAdapter;
use crate::disagg::DisaggregationMode;
use crate::engine::{EngineConfig, LLMEngine};
use crate::error::{BackendError, DynamoError, ErrorType};
const DEFAULT_GRACE_PERIOD_SECS: f64 = 5.0;
const GRACE_PERIOD_ENV: &str = "DYN_GRACEFUL_SHUTDOWN_GRACE_PERIOD_SECS";
#[derive(Clone, Debug, Default)]
pub struct RuntimeConfig {
pub discovery_backend: Option<String>,
pub request_plane: Option<String>,
pub event_plane: Option<String>,
}
impl RuntimeConfig {
pub fn has_overrides(&self) -> bool {
self.discovery_backend.is_some()
|| self.request_plane.is_some()
|| self.event_plane.is_some()
}
pub fn apply_to_env(&self) {
unsafe {
if let Some(ref v) = self.discovery_backend {
std::env::set_var("DYN_DISCOVERY_BACKEND", v);
}
if let Some(ref v) = self.request_plane {
std::env::set_var("DYN_REQUEST_PLANE", v);
}
if let Some(ref v) = self.event_plane {
std::env::set_var("DYN_EVENT_PLANE", v);
}
}
}
}
#[derive(Clone, Debug)]
pub struct WorkerConfig {
pub namespace: String,
pub component: String,
pub endpoint: String,
pub model_name: String,
pub served_model_name: Option<String>,
pub model_input: ModelInput,
pub endpoint_types: String,
pub custom_jinja_template: Option<PathBuf>,
pub tool_call_parser: Option<String>,
pub reasoning_parser: Option<String>,
pub exclude_tools_when_tool_choice_none: bool,
pub enable_local_indexer: bool,
pub metrics_labels: Vec<(String, String)>,
pub disaggregation_mode: DisaggregationMode,
pub runtime: RuntimeConfig,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
namespace: "dynamo".to_string(),
component: "backend".to_string(),
endpoint: "generate".to_string(),
model_name: String::new(),
served_model_name: None,
model_input: ModelInput::Tokens,
endpoint_types: "chat,completions".to_string(),
custom_jinja_template: None,
tool_call_parser: None,
reasoning_parser: None,
exclude_tools_when_tool_choice_none: true,
enable_local_indexer: true,
metrics_labels: Vec::new(),
disaggregation_mode: DisaggregationMode::Aggregated,
runtime: RuntimeConfig::default(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum LifecycleState {
Init,
Running,
StartFailed,
Stopped,
}
pub struct Worker {
engine: Arc<dyn LLMEngine>,
config: WorkerConfig,
state: LifecycleState,
}
impl Worker {
pub fn new(engine: Arc<dyn LLMEngine>, config: WorkerConfig) -> Self {
Self {
engine,
config,
state: LifecycleState::Init,
}
}
pub async fn run(mut self, runtime: Runtime) -> Result<(), DynamoError> {
validate_model_input(self.config.model_input)?;
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("install SIGTERM handler: {e}"),
)
})?;
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("install SIGINT handler: {e}"),
)
})?;
let shutdown_token = CancellationToken::new();
let signal_token = shutdown_token.clone();
let signal_handle = tokio::spawn(async move {
tokio::select! {
_ = sigterm.recv() => tracing::info!("SIGTERM received"),
_ = sigint.recv() => tracing::info!("SIGINT received"),
}
signal_token.cancel();
});
let outcome = {
let inner_fut = self.run_inner(runtime, &shutdown_token);
tokio::pin!(inner_fut);
tokio::select! {
result = &mut inner_fut => result,
_ = shutdown_token.cancelled() => {
let timeout = graceful_shutdown_timeout();
let grace = grace_period_secs();
let deadline = shutdown_deadline(timeout, grace);
tracing::debug!(
"graceful shutdown started; deadline {}s ({}s timeout + {:.2}s grace)",
deadline.as_secs(),
timeout.as_secs(),
grace,
);
match tokio::time::timeout(deadline, &mut inner_fut).await {
Ok(result) => result,
Err(_) => {
tracing::error!(
"Graceful shutdown exceeded {}s; force-exiting with code 911. \
Set DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to override.",
deadline.as_secs()
);
std::process::exit(911);
}
}
}
}
};
signal_handle.abort();
let _ = signal_handle.await;
self.cleanup_once().await;
outcome
}
async fn run_inner(
&mut self,
runtime: Runtime,
shutdown: &CancellationToken,
) -> Result<(), DynamoError> {
let drt = DistributedRuntime::from_settings(runtime)
.await
.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("distributed runtime: {e}"),
)
})?;
tracing::debug!("distributed runtime connected");
let component = drt
.namespace(&self.config.namespace)
.and_then(|ns| ns.component(&self.config.component))
.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("component: {e}"),
)
})?;
let endpoint = component.endpoint(&self.config.endpoint);
tracing::debug!(
namespace = %self.config.namespace,
component = %self.config.component,
endpoint = %self.config.endpoint,
"component and endpoint resolved"
);
if shutdown.is_cancelled() {
tracing::info!("Shutdown signal observed before engine.start(); exiting cleanly");
return Ok(());
}
let worker_id = drt.connection_id();
let engine_config = self.start_engine(worker_id).await?;
tracing::debug!(
model = %engine_config.model,
worker_id,
"engine.start() complete"
);
if shutdown.is_cancelled() {
tracing::info!("Shutdown signal observed during engine.start(); running orchestrator");
self.orchestrator_steps(&endpoint).await;
return Ok(());
}
self.serve_with_orchestrator(&engine_config, endpoint, shutdown.clone())
.await
}
async fn orchestrator_steps(&mut self, endpoint: &dynamo_runtime::component::Endpoint) {
if let Err(e) = endpoint.unregister_endpoint_instance().await {
tracing::warn!(error = %e, "discovery unregister failed");
} else {
tracing::info!("Endpoint unregistered from discovery");
}
self.run_engine_shutdown_steps().await;
}
async fn start_engine(&mut self, worker_id: u64) -> Result<EngineConfig, DynamoError> {
assert_eq!(
self.state,
LifecycleState::Init,
"start_engine called in unexpected state {:?}",
self.state
);
match self.engine.start(worker_id).await {
Ok(cfg) => {
self.state = LifecycleState::Running;
Ok(cfg)
}
Err(e) => {
self.state = LifecycleState::StartFailed;
Err(e)
}
}
}
async fn cleanup_once(&mut self) {
match self.state {
LifecycleState::Init | LifecycleState::Stopped => {
self.state = LifecycleState::Stopped;
return;
}
LifecycleState::Running | LifecycleState::StartFailed => {}
}
match self.engine.cleanup().await {
Ok(()) => tracing::info!("Engine cleanup complete"),
Err(e) => tracing::error!(error = %e, "engine cleanup failed"),
}
self.state = LifecycleState::Stopped;
}
async fn serve_with_orchestrator(
&mut self,
engine_config: &EngineConfig,
endpoint: dynamo_runtime::component::Endpoint,
shutdown: CancellationToken,
) -> Result<(), DynamoError> {
let model_type = resolve_model_type(&self.config)?;
let mut local_model = build_local_model(&self.config, engine_config).await?;
tracing::debug!("local model built");
local_model
.attach(&endpoint, model_type, self.config.model_input, None)
.await
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("model attach: {e}"),
)
})?;
tracing::debug!("model registered with discovery");
let served = resolve_served_name(&self.config, engine_config)
.unwrap_or_else(|| engine_config.model.clone());
tracing::info!(
"Serving {} on {}.{}.{}",
served,
self.config.namespace,
self.config.component,
self.config.endpoint
);
let ingress = Ingress::for_engine(Arc::new(EngineAdapter::new(
self.engine.clone(),
self.config.disaggregation_mode,
)))
.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("ingress: {e}"),
)
})?;
let metrics_labels = if self.config.metrics_labels.is_empty() {
None
} else {
Some(self.config.metrics_labels.clone())
};
let _orchestrator_registration = endpoint.drt().register_graceful_task();
let serve_fut = endpoint
.endpoint_builder()
.handler(ingress)
.metrics_labels(metrics_labels)
.graceful_shutdown(true)
.start();
tokio::pin!(serve_fut);
tokio::select! {
biased;
result = &mut serve_fut => {
match result {
Ok(()) => {
tracing::info!(
"Endpoint completed gracefully; running shutdown orchestration"
);
}
Err(e) => {
return Err(err(
ErrorType::Backend(BackendError::Unknown),
format!("serve: {e}"),
));
}
}
}
_ = shutdown.cancelled() => {
tracing::info!("Received shutdown signal; running graceful orchestration");
}
}
self.orchestrator_steps(&endpoint).await;
Ok(())
}
async fn run_engine_shutdown_steps(&mut self) {
self.run_engine_shutdown_steps_with_grace(grace_period_secs())
.await
}
async fn run_engine_shutdown_steps_with_grace(&mut self, grace: f64) {
if grace > 0.0 {
tracing::info!("Grace period {:.2}s before drain", grace);
tokio::time::sleep(Duration::from_secs_f64(grace)).await;
}
if let Err(e) = self.engine.drain().await {
tracing::warn!(error = %e, "engine drain failed");
}
self.cleanup_once().await;
}
}
fn graceful_shutdown_timeout() -> Duration {
use dynamo_runtime::config::environment_names::worker as env_worker;
use dynamo_runtime::worker::{
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG, DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE,
};
let default = if cfg!(debug_assertions) {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
} else {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
};
let secs = std::env::var(env_worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(default);
Duration::from_secs(secs)
}
fn shutdown_deadline(timeout: Duration, grace_secs: f64) -> Duration {
let grace = if grace_secs > 0.0 {
Duration::from_secs_f64(grace_secs)
} else {
Duration::ZERO
};
timeout.saturating_add(grace)
}
fn grace_period_secs() -> f64 {
match std::env::var(GRACE_PERIOD_ENV) {
Ok(s) if !s.is_empty() => match s.parse::<f64>() {
Ok(v) if v >= 0.0 => v,
Ok(_) => 0.0,
Err(_) => {
tracing::warn!(
"Invalid {}={:?}; using default {}",
GRACE_PERIOD_ENV,
s,
DEFAULT_GRACE_PERIOD_SECS
);
DEFAULT_GRACE_PERIOD_SECS
}
},
_ => DEFAULT_GRACE_PERIOD_SECS,
}
}
fn err(error_type: ErrorType, message: impl Into<String>) -> DynamoError {
DynamoError::builder()
.error_type(error_type)
.message(message)
.build()
}
fn resolve_served_name(config: &WorkerConfig, engine_config: &EngineConfig) -> Option<String> {
config
.served_model_name
.clone()
.or_else(|| engine_config.served_model_name.clone())
}
fn resolve_model_type(config: &WorkerConfig) -> Result<ModelType, DynamoError> {
if config.disaggregation_mode.is_prefill() {
return Ok(ModelType::Prefill);
}
parse_endpoint_types(&config.endpoint_types)
}
fn parse_endpoint_types(s: &str) -> Result<ModelType, DynamoError> {
let mut out = ModelType::empty();
let mut any = false;
for raw in s.split(',') {
let t = raw.trim().to_ascii_lowercase();
if t.is_empty() {
continue;
}
let flag = match t.as_str() {
"chat" => ModelType::Chat,
"completions" => ModelType::Completions,
"embedding" | "embeddings" => ModelType::Embedding,
"tensor" => ModelType::TensorBased,
"prefill" => ModelType::Prefill,
other => {
return Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
format!("unknown endpoint type '{other}'"),
));
}
};
out |= flag;
any = true;
}
if !any {
return Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
"endpoint_types cannot be empty",
));
}
Ok(out)
}
fn validate_model_input(model_input: ModelInput) -> Result<(), DynamoError> {
if model_input == ModelInput::Tokens {
return Ok(());
}
Err(err(
ErrorType::Backend(BackendError::InvalidArgument),
format!(
"dynamo_backend_common::Worker currently supports only ModelInput::Tokens; got '{}'. \
ModelInput::Text and ModelInput::Tensor require dedicated raw-request adapters.",
model_input.as_str()
),
))
}
async fn build_local_model(
config: &WorkerConfig,
engine_config: &EngineConfig,
) -> Result<LocalModel, DynamoError> {
let served_name = resolve_served_name(config, engine_config)
.or_else(|| Some(engine_config.model.clone()))
.filter(|s| !s.is_empty());
let enable_local_indexer =
config.enable_local_indexer && !config.disaggregation_mode.is_decode();
let disaggregated_endpoint = match (&engine_config.bootstrap_host, engine_config.bootstrap_port)
{
(Some(host), Some(port)) => {
tracing::info!(
bootstrap_host = %host,
bootstrap_port = port,
"Publishing disaggregated_endpoint for prefill worker"
);
Some(DisaggregatedEndpoint {
bootstrap_host: Some(host.clone()),
bootstrap_port: Some(port),
})
}
_ => None,
};
let rt_cfg = ModelRuntimeConfig {
total_kv_blocks: engine_config.total_kv_blocks,
max_num_seqs: engine_config.max_num_seqs,
max_num_batched_tokens: engine_config.max_num_batched_tokens,
tool_call_parser: config.tool_call_parser.clone(),
reasoning_parser: config.reasoning_parser.clone(),
exclude_tools_when_tool_choice_none: config.exclude_tools_when_tool_choice_none,
enable_local_indexer,
disaggregated_endpoint,
..ModelRuntimeConfig::default()
};
let mut builder = LocalModelBuilder::default();
builder
.model_name(served_name)
.context_length(engine_config.context_length)
.kv_cache_block_size(engine_config.kv_cache_block_size)
.custom_template_path(config.custom_jinja_template.clone())
.runtime_config(rt_cfg);
if !config.model_name.is_empty() {
let source = config.model_name.clone();
let local_path = if std::fs::exists(&source).map_err(|e| {
err(
ErrorType::Backend(BackendError::InvalidArgument),
format!("model path: {e}"),
)
})? {
PathBuf::from(&source)
} else {
LocalModel::fetch(&source, false).await.map_err(|e| {
err(
ErrorType::Backend(BackendError::CannotConnect),
format!("fetch '{source}': {e}"),
)
})?
};
builder.model_path(local_path);
builder.source_path(PathBuf::from(source));
}
builder.build().await.map_err(|e| {
err(
ErrorType::Backend(BackendError::Unknown),
format!("build local model: {e}"),
)
})
}
#[cfg(test)]
mod tests {
use super::*;
fn error_type_of(result: Result<ModelType, DynamoError>) -> ErrorType {
result.unwrap_err().error_type()
}
#[test]
fn parse_endpoint_types_happy_path() {
let got = parse_endpoint_types("chat,completions").unwrap();
assert_eq!(got, ModelType::Chat | ModelType::Completions);
}
#[test]
fn parse_endpoint_types_single() {
assert_eq!(parse_endpoint_types("chat").unwrap(), ModelType::Chat);
assert_eq!(
parse_endpoint_types("completions").unwrap(),
ModelType::Completions
);
assert_eq!(
parse_endpoint_types("embedding").unwrap(),
ModelType::Embedding
);
}
#[test]
fn parse_endpoint_types_trims_and_lowercases() {
let got = parse_endpoint_types(" Chat , COMPLETIONS ").unwrap();
assert_eq!(got, ModelType::Chat | ModelType::Completions);
}
#[test]
fn parse_endpoint_types_rejects_empty() {
assert_eq!(
error_type_of(parse_endpoint_types("")),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert_eq!(
error_type_of(parse_endpoint_types(" , ")),
ErrorType::Backend(BackendError::InvalidArgument)
);
}
#[test]
fn parse_endpoint_types_rejects_unknown() {
let e = parse_endpoint_types("chat,bogus").unwrap_err();
assert_eq!(
e.error_type(),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert!(e.to_string().contains("bogus"));
}
#[test]
fn validate_model_input_accepts_tokens() {
validate_model_input(ModelInput::Tokens).unwrap();
}
#[test]
fn validate_model_input_rejects_text_and_tensor() {
for input in [ModelInput::Text, ModelInput::Tensor] {
let e = validate_model_input(input).unwrap_err();
assert_eq!(
e.error_type(),
ErrorType::Backend(BackendError::InvalidArgument)
);
assert!(e.to_string().contains(input.as_str()));
}
}
#[tokio::test]
async fn build_local_model_carries_runtime_parser_settings() {
let config = WorkerConfig {
tool_call_parser: Some("kimi_k2".to_string()),
reasoning_parser: Some("kimi_k25".to_string()),
exclude_tools_when_tool_choice_none: false,
enable_local_indexer: false,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "nvidia/Kimi-K2.5-NVFP4".to_string(),
total_kv_blocks: Some(100),
max_num_seqs: Some(16),
max_num_batched_tokens: Some(8192),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
let runtime_config = local_model.runtime_config();
assert_eq!(runtime_config.total_kv_blocks, Some(100));
assert_eq!(runtime_config.max_num_seqs, Some(16));
assert_eq!(runtime_config.max_num_batched_tokens, Some(8192));
assert_eq!(runtime_config.tool_call_parser.as_deref(), Some("kimi_k2"));
assert_eq!(runtime_config.reasoning_parser.as_deref(), Some("kimi_k25"));
assert!(!runtime_config.exclude_tools_when_tool_choice_none);
assert!(!runtime_config.enable_local_indexer);
}
#[test]
fn resolve_model_type_aggregated_uses_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat,completions".to_string(),
disaggregation_mode: DisaggregationMode::Aggregated,
..WorkerConfig::default()
};
assert_eq!(
resolve_model_type(&config).unwrap(),
ModelType::Chat | ModelType::Completions,
);
}
#[test]
fn resolve_model_type_decode_uses_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat".to_string(),
disaggregation_mode: DisaggregationMode::Decode,
..WorkerConfig::default()
};
assert_eq!(resolve_model_type(&config).unwrap(), ModelType::Chat);
}
#[test]
fn resolve_model_type_prefill_overrides_endpoint_types() {
let config = WorkerConfig {
endpoint_types: "chat,completions".to_string(),
disaggregation_mode: DisaggregationMode::Prefill,
..WorkerConfig::default()
};
assert_eq!(resolve_model_type(&config).unwrap(), ModelType::Prefill);
}
#[tokio::test]
async fn build_local_model_decode_disables_local_indexer() {
let config = WorkerConfig {
enable_local_indexer: true,
disaggregation_mode: DisaggregationMode::Decode,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(!local_model.runtime_config().enable_local_indexer);
}
#[tokio::test]
async fn build_local_model_aggregated_keeps_local_indexer() {
let config = WorkerConfig {
enable_local_indexer: true,
disaggregation_mode: DisaggregationMode::Aggregated,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(local_model.runtime_config().enable_local_indexer);
}
#[tokio::test]
async fn build_local_model_publishes_disaggregated_endpoint_when_engine_provides_it() {
let config = WorkerConfig {
disaggregation_mode: DisaggregationMode::Prefill,
..WorkerConfig::default()
};
let engine_config = EngineConfig {
model: "test/model".to_string(),
bootstrap_host: Some("10.0.0.5".to_string()),
bootstrap_port: Some(12345),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
let endpoint = local_model
.runtime_config()
.disaggregated_endpoint
.as_ref()
.expect("disaggregated_endpoint must be published");
assert_eq!(endpoint.bootstrap_host.as_deref(), Some("10.0.0.5"));
assert_eq!(endpoint.bootstrap_port, Some(12345));
}
#[tokio::test]
async fn build_local_model_skips_disaggregated_endpoint_when_engine_omits_it() {
let config = WorkerConfig::default();
let engine_config = EngineConfig {
model: "test/model".to_string(),
..EngineConfig::default()
};
let local_model = build_local_model(&config, &engine_config).await.unwrap();
assert!(
local_model
.runtime_config()
.disaggregated_endpoint
.is_none()
);
}
use crate::engine::PreprocessedRequest;
use async_trait::async_trait;
use futures::stream::BoxStream;
use std::sync::atomic::{AtomicUsize, Ordering};
struct StateMockEngine {
start_should_fail: bool,
cleanup_calls: Arc<AtomicUsize>,
}
impl StateMockEngine {
fn new(start_should_fail: bool) -> (Arc<Self>, Arc<AtomicUsize>) {
let cleanup_calls = Arc::new(AtomicUsize::new(0));
let eng = Arc::new(Self {
start_should_fail,
cleanup_calls: cleanup_calls.clone(),
});
(eng, cleanup_calls)
}
}
#[async_trait]
impl LLMEngine for StateMockEngine {
async fn start(&self, _worker_id: u64) -> Result<EngineConfig, DynamoError> {
if self.start_should_fail {
Err(err(
ErrorType::Backend(BackendError::EngineShutdown),
"synthetic start failure",
))
} else {
Ok(EngineConfig {
model: "mock".to_string(),
..EngineConfig::default()
})
}
}
async fn generate(
&self,
_request: PreprocessedRequest,
_ctx: crate::engine::GenerateContext,
) -> Result<
BoxStream<'static, Result<crate::engine::LLMEngineOutput, DynamoError>>,
DynamoError,
> {
unreachable!("not used in state machine tests")
}
async fn cleanup(&self) -> Result<(), DynamoError> {
self.cleanup_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
fn worker_with(engine: Arc<dyn LLMEngine>) -> Worker {
Worker::new(engine, WorkerConfig::default())
}
#[tokio::test]
async fn start_engine_init_to_running_on_success() {
let (engine, _) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
let cfg = worker.start_engine(0).await.expect("start");
assert_eq!(cfg.model, "mock");
assert_eq!(worker.state, LifecycleState::Running);
}
#[tokio::test]
async fn start_engine_init_to_start_failed_on_failure() {
let (engine, _) = StateMockEngine::new(true);
let mut worker = worker_with(engine);
let res = worker.start_engine(0).await;
assert!(res.is_err(), "start should fail");
assert_eq!(worker.state, LifecycleState::StartFailed);
}
#[tokio::test]
async fn cleanup_once_runs_engine_cleanup_after_failed_start() {
let (engine, cleanup_calls) = StateMockEngine::new(true);
let mut worker = worker_with(engine);
let _ = worker.start_engine(0).await;
worker.cleanup_once().await;
assert_eq!(
cleanup_calls.load(Ordering::SeqCst),
1,
"engine.cleanup() must run exactly once after a failed start \
so engines don't have to re-implement the guard"
);
assert_eq!(worker.state, LifecycleState::Stopped);
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 1);
assert_eq!(worker.state, LifecycleState::Stopped);
}
#[tokio::test]
async fn cleanup_once_is_idempotent() {
let (engine, cleanup_calls) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.cleanup_once().await;
worker.cleanup_once().await;
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 1);
assert_eq!(worker.state, LifecycleState::Stopped);
}
#[tokio::test]
async fn cleanup_once_noops_when_never_started() {
let (engine, cleanup_calls) = StateMockEngine::new(false);
let mut worker = worker_with(engine);
worker.cleanup_once().await;
assert_eq!(cleanup_calls.load(Ordering::SeqCst), 0);
assert_eq!(worker.state, LifecycleState::Stopped);
}
use std::sync::Mutex as StdMutex;
struct OrderingMockEngine {
log: Arc<StdMutex<Vec<&'static str>>>,
drain_should_fail: bool,
}
impl OrderingMockEngine {
fn new(drain_should_fail: bool) -> (Arc<Self>, Arc<StdMutex<Vec<&'static str>>>) {
let log = Arc::new(StdMutex::new(Vec::new()));
let eng = Arc::new(Self {
log: log.clone(),
drain_should_fail,
});
(eng, log)
}
}
#[async_trait]
impl LLMEngine for OrderingMockEngine {
async fn start(&self, _worker_id: u64) -> Result<EngineConfig, DynamoError> {
self.log.lock().unwrap().push("start");
Ok(EngineConfig {
model: "mock".to_string(),
..EngineConfig::default()
})
}
async fn generate(
&self,
_request: PreprocessedRequest,
_ctx: crate::engine::GenerateContext,
) -> Result<
BoxStream<'static, Result<crate::engine::LLMEngineOutput, DynamoError>>,
DynamoError,
> {
unreachable!("not used in orchestrator tests")
}
async fn drain(&self) -> Result<(), DynamoError> {
self.log.lock().unwrap().push("drain");
if self.drain_should_fail {
Err(err(
ErrorType::Backend(BackendError::Unknown),
"synthetic drain failure",
))
} else {
Ok(())
}
}
async fn cleanup(&self) -> Result<(), DynamoError> {
self.log.lock().unwrap().push("cleanup");
Ok(())
}
}
#[tokio::test]
async fn shutdown_steps_run_drain_before_cleanup() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.run_engine_shutdown_steps_with_grace(0.0).await;
let recorded = log.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["start", "drain", "cleanup"],
"drain must run before cleanup"
);
}
#[tokio::test]
async fn shutdown_steps_drain_failure_does_not_block_cleanup() {
let (engine, log) = OrderingMockEngine::new(true); let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
worker.run_engine_shutdown_steps_with_grace(0.0).await;
let recorded = log.lock().unwrap().clone();
assert_eq!(recorded, vec!["start", "drain", "cleanup"]);
assert_eq!(worker.state, LifecycleState::Stopped);
}
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn with_env<F: FnOnce() -> R, R>(key: &str, value: Option<&str>, f: F) -> R {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let prev = std::env::var(key).ok();
unsafe {
match value {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
let out = f();
unsafe {
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
out
}
#[test]
fn grace_period_default_when_unset() {
with_env(GRACE_PERIOD_ENV, None, || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
#[test]
fn grace_period_parses_valid_value() {
with_env(GRACE_PERIOD_ENV, Some("2.5"), || {
assert_eq!(grace_period_secs(), 2.5);
});
}
#[test]
fn grace_period_clamps_negative_to_zero() {
with_env(GRACE_PERIOD_ENV, Some("-1"), || {
assert_eq!(grace_period_secs(), 0.0);
});
}
#[test]
fn grace_period_falls_back_to_default_on_parse_error() {
with_env(GRACE_PERIOD_ENV, Some("not-a-number"), || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
#[test]
fn grace_period_treats_empty_as_unset() {
with_env(GRACE_PERIOD_ENV, Some(""), || {
assert_eq!(grace_period_secs(), DEFAULT_GRACE_PERIOD_SECS);
});
}
const SHUTDOWN_TIMEOUT_ENV: &str =
dynamo_runtime::config::environment_names::worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT;
fn expected_default_timeout_secs() -> u64 {
if cfg!(debug_assertions) {
dynamo_runtime::worker::DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
} else {
dynamo_runtime::worker::DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
}
}
#[test]
fn shutdown_timeout_default_when_unset() {
with_env(SHUTDOWN_TIMEOUT_ENV, None, || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_timeout_parses_valid_value() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some("42"), || {
assert_eq!(graceful_shutdown_timeout(), Duration::from_secs(42));
});
}
#[test]
fn shutdown_timeout_falls_back_to_default_on_parse_error() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some("not-a-number"), || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_timeout_treats_empty_as_unset() {
with_env(SHUTDOWN_TIMEOUT_ENV, Some(""), || {
assert_eq!(
graceful_shutdown_timeout(),
Duration::from_secs(expected_default_timeout_secs())
);
});
}
#[test]
fn shutdown_deadline_adds_grace_to_timeout() {
assert_eq!(
shutdown_deadline(Duration::from_secs(5), 5.0),
Duration::from_secs(10)
);
assert_eq!(
shutdown_deadline(Duration::from_secs(30), 0.0),
Duration::from_secs(30)
);
assert_eq!(
shutdown_deadline(Duration::from_secs(2), 0.5),
Duration::from_millis(2_500)
);
}
#[test]
fn shutdown_deadline_clamps_negative_grace() {
assert_eq!(
shutdown_deadline(Duration::from_secs(5), -1.0),
Duration::from_secs(5)
);
}
#[tokio::test(start_paused = true)]
async fn timeout_alone_starves_drain_cleanup_when_grace_meets_timeout() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
let timeout = Duration::from_secs(5);
let grace = 5.1;
let result =
tokio::time::timeout(timeout, worker.run_engine_shutdown_steps_with_grace(grace)).await;
assert!(
result.is_err(),
"buggy deadline must expire before drain/cleanup run"
);
let recorded = log.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["start"],
"drain and cleanup must not have been observed"
);
}
#[tokio::test(start_paused = true)]
async fn shutdown_deadline_reserves_grace_so_drain_cleanup_complete() {
let (engine, log) = OrderingMockEngine::new(false);
let mut worker = worker_with(engine);
worker.start_engine(0).await.unwrap();
let timeout = Duration::from_secs(5);
let grace = 5.1;
let deadline = shutdown_deadline(timeout, grace);
let result =
tokio::time::timeout(deadline, worker.run_engine_shutdown_steps_with_grace(grace))
.await;
assert!(
result.is_ok(),
"fixed deadline must allow drain + cleanup to finish"
);
let recorded = log.lock().unwrap().clone();
assert_eq!(recorded, vec!["start", "drain", "cleanup"]);
}
#[test]
fn runtime_config_apply_to_env_writes_set_fields() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let cfg = RuntimeConfig {
discovery_backend: Some("file".to_string()),
request_plane: Some("tcp".to_string()),
event_plane: Some("zmq".to_string()),
};
let prev: Vec<_> = [
"DYN_DISCOVERY_BACKEND",
"DYN_REQUEST_PLANE",
"DYN_EVENT_PLANE",
]
.iter()
.map(|k| (*k, std::env::var(k).ok()))
.collect();
cfg.apply_to_env();
assert_eq!(std::env::var("DYN_DISCOVERY_BACKEND").unwrap(), "file");
assert_eq!(std::env::var("DYN_REQUEST_PLANE").unwrap(), "tcp");
assert_eq!(std::env::var("DYN_EVENT_PLANE").unwrap(), "zmq");
for (k, v) in prev {
unsafe {
match v {
Some(val) => std::env::set_var(k, val),
None => std::env::remove_var(k),
}
}
}
}
#[test]
fn runtime_config_apply_to_env_leaves_unset_fields_untouched() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let key = "DYN_REQUEST_PLANE";
let prev = std::env::var(key).ok();
unsafe { std::env::set_var(key, "preexisting") };
let cfg = RuntimeConfig {
discovery_backend: Some("etcd".to_string()),
request_plane: None,
event_plane: None,
};
cfg.apply_to_env();
assert_eq!(std::env::var(key).unwrap(), "preexisting");
unsafe {
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
}
}