use std::sync::Arc;
use std::time::Duration;
use dynamo_llm::protocols::common::preprocessor::PreprocessedRequest;
use dynamo_llm::protocols::common::{FinishReason, OutputOptions, SamplingOptions, StopConditions};
use dynamo_runtime::engine::AsyncEngineContext;
use dynamo_runtime::pipeline::{AsyncEngineContextProvider, Context};
use futures::StreamExt;
use crate::engine::{GenerateContext, LLMEngine};
use crate::metrics::{EngineMetrics, TestHierarchy};
use ConformanceFailure::*;
const DEFAULT_CANCEL_DEADLINE: Duration = Duration::from_secs(2);
pub fn mock_context() -> Arc<dyn AsyncEngineContext> {
Context::<()>::new(()).context()
}
pub fn cancelling_context(after: Duration) -> Arc<dyn AsyncEngineContext> {
let ctx = Context::<()>::new(()).context();
let ctx2 = ctx.clone();
tokio::spawn(async move {
tokio::time::sleep(after).await;
ctx2.stop_generating();
});
ctx
}
#[derive(Debug)]
pub enum ConformanceFailure {
StartFailed(String),
EmptyModelInConfig,
GenerateFailed(String),
NoChunksYielded,
ChunkAfterTerminal,
NoTerminalChunk,
StreamYieldedError(String),
ConcurrentGenerateFailed(String),
CancellationNotObserved {
after: Duration,
},
CancellationIgnored,
CleanupFailed(String),
SecondCleanupFailed(String),
CleanupWithoutStartFailed(String),
KvEventSourcesFailed(String),
KvEventSourcesNotIdempotent,
SetupMetricsFailed(String),
ComponentMetricsNotIdempotent,
CompletionTokensMismatch {
chunked: usize,
reported: u32,
},
}
impl std::fmt::Display for ConformanceFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StartFailed(m) => write!(f, "start() failed: {m}"),
EmptyModelInConfig => write!(f, "EngineConfig.model is empty"),
GenerateFailed(m) => write!(f, "generate() failed: {m}"),
NoChunksYielded => write!(f, "generate() stream yielded no chunks"),
ChunkAfterTerminal => write!(f, "chunk yielded after terminal chunk"),
NoTerminalChunk => write!(f, "stream ended without a terminal chunk"),
StreamYieldedError(m) => write!(f, "engine stream yielded Err: {m}"),
ConcurrentGenerateFailed(m) => {
write!(f, "concurrent generate() calls failed: {m}")
}
CancellationNotObserved { after } => write!(
f,
"stream did not terminate within {after:?} after cancellation"
),
CancellationIgnored => write!(
f,
"stream terminated but terminal chunk's finish_reason was not Cancelled \
(engine must emit FinishReason::Cancelled when it observes cancellation)"
),
CleanupFailed(m) => write!(f, "cleanup() failed: {m}"),
SecondCleanupFailed(m) => {
write!(f, "second cleanup() call failed (must be idempotent): {m}")
}
CleanupWithoutStartFailed(m) => write!(
f,
"cleanup() failed on a never-started engine: {m} \
(Worker calls cleanup() after start() raises, so engines must \
be null-safe against partial / no allocation)"
),
KvEventSourcesFailed(m) => write!(f, "kv_event_sources() failed: {m}"),
KvEventSourcesNotIdempotent => write!(
f,
"kv_event_sources() returned different dp_rank set on a second call \
(the descriptor list must be stable for the engine's lifetime)"
),
SetupMetricsFailed(m) => write!(f, "setup_metrics() failed: {m}"),
ComponentMetricsNotIdempotent => write!(
f,
"setup_metrics().dp_ranks returned different ranks across calls \
(the rank set must be stable for the engine's lifetime)"
),
CompletionTokensMismatch { chunked, reported } => write!(
f,
"engine emitted {chunked} tokens across the stream but reported \
completion_usage.completion_tokens = {reported} on the terminal \
(engine bookkeeping diverges from streamed output)"
),
}
}
}
impl std::error::Error for ConformanceFailure {}
pub async fn run_conformance<E, F>(mut factory: F) -> Result<(), ConformanceFailure>
where
E: LLMEngine,
F: FnMut() -> E,
{
let engine = factory();
let config = engine
.start(0)
.await
.map_err(|e| StartFailed(e.to_string()))?;
if config.model.is_empty() {
return Err(EmptyModelInConfig);
}
check_kv_event_sources(&engine).await?;
check_setup_metrics(&engine).await?;
check_single_generate(&engine, &config.model).await?;
check_concurrent_generates(&engine, &config.model).await?;
check_cancellation(&engine, &config.model, DEFAULT_CANCEL_DEADLINE).await?;
engine
.cleanup()
.await
.map_err(|e| CleanupFailed(e.to_string()))?;
engine
.cleanup()
.await
.map_err(|e| SecondCleanupFailed(e.to_string()))?;
let fresh = factory();
fresh
.cleanup()
.await
.map_err(|e| CleanupWithoutStartFailed(e.to_string()))?;
Ok(())
}
fn request(model: &str) -> PreprocessedRequest {
request_with_max_tokens(model, None)
}
fn request_with_max_tokens(model: &str, max_tokens: Option<u32>) -> PreprocessedRequest {
PreprocessedRequest::builder()
.model(model.to_string())
.token_ids(vec![1, 2, 3])
.stop_conditions(StopConditions {
max_tokens,
..Default::default()
})
.sampling_options(SamplingOptions::default())
.output_options(OutputOptions::default())
.build()
.expect("build request")
}
async fn check_single_generate<E: LLMEngine>(
engine: &E,
model: &str,
) -> Result<(), ConformanceFailure> {
let ctx = mock_context();
let stream = engine
.generate(request(model), GenerateContext::new(ctx, None))
.await
.map_err(|e| GenerateFailed(e.to_string()))?;
let items: Vec<_> = stream.collect().await;
if items.is_empty() {
return Err(NoChunksYielded);
}
let mut chunks = Vec::with_capacity(items.len());
for item in items {
match item {
Ok(c) => chunks.push(c),
Err(e) => return Err(StreamYieldedError(e.to_string())),
}
}
let mut terminal_idx = None;
for (i, c) in chunks.iter().enumerate() {
if c.finish_reason.is_some() {
if terminal_idx.is_some() {
return Err(ChunkAfterTerminal);
}
terminal_idx = Some(i);
}
}
let terminal_idx = match terminal_idx {
Some(i) if i == chunks.len() - 1 => i,
Some(_) => return Err(ChunkAfterTerminal),
None => return Err(NoTerminalChunk),
};
if let Some(usage) = chunks[terminal_idx].completion_usage.as_ref() {
let chunked: usize = chunks.iter().map(|c| c.token_ids.len()).sum();
if chunked != usage.completion_tokens as usize {
return Err(CompletionTokensMismatch {
chunked,
reported: usage.completion_tokens,
});
}
}
Ok(())
}
async fn check_concurrent_generates<E: LLMEngine>(
engine: &E,
model: &str,
) -> Result<(), ConformanceFailure> {
const CONCURRENT: usize = 8;
let futs = (0..CONCURRENT).map(|_| async {
let ctx = mock_context();
let stream = engine
.generate(request(model), GenerateContext::new(ctx, None))
.await
.map_err(|e| ConcurrentGenerateFailed(e.to_string()))?;
let n = stream.count().await;
if n == 0 {
Err(ConcurrentGenerateFailed("stream was empty".to_string()))
} else {
Ok(())
}
});
for result in futures::future::join_all(futs).await {
result?;
}
Ok(())
}
async fn check_kv_event_sources<E: LLMEngine>(engine: &E) -> Result<(), ConformanceFailure> {
let first = engine
.kv_event_sources()
.await
.map_err(|e| KvEventSourcesFailed(e.to_string()))?;
let second = engine
.kv_event_sources()
.await
.map_err(|e| KvEventSourcesFailed(e.to_string()))?;
let ranks_a: Vec<u32> = first.iter().map(|s| s.dp_rank()).collect();
let ranks_b: Vec<u32> = second.iter().map(|s| s.dp_rank()).collect();
if ranks_a != ranks_b {
return Err(KvEventSourcesNotIdempotent);
}
Ok(())
}
async fn check_setup_metrics<E: LLMEngine>(engine: &E) -> Result<(), ConformanceFailure> {
let make_ctx = |metrics: &'static EngineMetrics| crate::engine::MetricsCtx {
model: "test-model",
component: "test",
model_load_time_seconds: 0.0,
metrics,
};
let metrics: &'static EngineMetrics = Box::leak(Box::new(EngineMetrics::from_hierarchy(
TestHierarchy::new(),
)));
let bindings_a = engine
.setup_metrics(make_ctx(metrics))
.await
.map_err(|e| SetupMetricsFailed(e.to_string()))?;
let bindings_b = engine
.setup_metrics(make_ctx(metrics))
.await
.map_err(|e| SetupMetricsFailed(e.to_string()))?;
if bindings_a.dp_ranks != bindings_b.dp_ranks {
return Err(ComponentMetricsNotIdempotent);
}
Ok(())
}
async fn check_cancellation<E: LLMEngine>(
engine: &E,
model: &str,
deadline: Duration,
) -> Result<(), ConformanceFailure> {
const LONG_MAX_TOKENS: u32 = 10_000;
let ctx = mock_context();
let stream = engine
.generate(
request_with_max_tokens(model, Some(LONG_MAX_TOKENS)),
GenerateContext::new(ctx.clone(), None),
)
.await
.map_err(|e| GenerateFailed(e.to_string()))?;
ctx.stop_generating();
let items = tokio::time::timeout(deadline, async {
let mut s = stream;
let mut out = Vec::new();
while let Some(c) = s.next().await {
out.push(c);
}
out
})
.await
.map_err(|_| CancellationNotObserved { after: deadline })?;
match items.last() {
Some(Ok(c)) if matches!(c.finish_reason, Some(FinishReason::Cancelled)) => Ok(()),
Some(Ok(_)) => Err(CancellationIgnored),
Some(Err(e)) => Err(StreamYieldedError(e.to_string())),
None => Err(NoChunksYielded),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::{EngineConfig, PreprocessedRequest};
use crate::error::DynamoError;
use async_trait::async_trait;
use futures::stream::BoxStream;
struct ConfigurableMetricsEngine {
dp_ranks: Vec<u32>,
}
#[async_trait]
impl LLMEngine for ConfigurableMetricsEngine {
async fn start(&self, _worker_id: u64) -> Result<EngineConfig, DynamoError> {
Ok(EngineConfig {
model: "mock".to_string(),
..EngineConfig::default()
})
}
async fn generate(
&self,
_request: PreprocessedRequest,
_ctx: crate::engine::GenerateContext,
) -> Result<
BoxStream<'static, Result<crate::engine::LLMEngineOutput, DynamoError>>,
DynamoError,
> {
unreachable!()
}
async fn cleanup(&self) -> Result<(), DynamoError> {
Ok(())
}
async fn setup_metrics(
&self,
_ctx: crate::engine::MetricsCtx<'_>,
) -> Result<crate::engine::MetricsBindings, DynamoError> {
Ok(crate::engine::MetricsBindings {
dp_ranks: self.dp_ranks.clone(),
on_publisher_ready: None,
})
}
}
#[tokio::test]
async fn check_setup_metrics_accepts_opt_out() {
let engine = ConfigurableMetricsEngine { dp_ranks: vec![] };
let result = check_setup_metrics(&engine).await;
assert!(result.is_ok(), "opt-out should pass: {:?}", result);
}
#[tokio::test]
async fn check_setup_metrics_accepts_stable_ranks() {
let engine = ConfigurableMetricsEngine {
dp_ranks: vec![0, 1, 2],
};
assert!(check_setup_metrics(&engine).await.is_ok());
}
}