dynamo_runtime/engine.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Asynchronous Engine System with Type Erasure Support
5//!
6//! This module provides the core asynchronous engine abstraction for Dynamo's runtime system.
7//! It defines the `AsyncEngine` trait for streaming engines and provides sophisticated
8//! type-erasure capabilities for managing heterogeneous engine collections.
9//!
10//! ## Type Erasure Overview
11//!
12//! Type erasure is a critical feature that allows storing different `AsyncEngine` implementations
13//! with varying generic type parameters in a single collection (e.g., `HashMap<String, Arc<dyn AnyAsyncEngine>>`).
14//! This is essential for:
15//!
16//! - **Dynamic Engine Management**: Registering and retrieving engines at runtime based on configuration
17//! - **Plugin Systems**: Loading different engine implementations without compile-time knowledge
18//! - **Service Discovery**: Managing multiple engine types in a unified registry
19//!
20//! ## Implementation Details
21//!
22//! The type-erasure system uses several advanced Rust features:
23//!
24//! - **Trait Objects (`dyn Trait`)**: For runtime polymorphism without compile-time type information
25//! - **`std::any::TypeId`**: For runtime type checking during downcasting
26//! - **`std::any::Any`**: For type-erased storage and safe downcasting
27//! - **`PhantomData`**: For maintaining type relationships in generic wrappers
28//! - **Extension Traits**: For ergonomic API design without modifying existing types
29//!
30//! ## Safety Considerations
31//!
32//! ⚠️ **IMPORTANT**: The type-erasure system relies on precise type matching at runtime.
33//! When modifying these traits or their implementations:
34//!
35//! - **Never change the type ID logic** in `AnyAsyncEngine` implementations
36//! - **Maintain the blanket `Data` implementation** for all `Send + Sync + 'static` types
37//! - **Test downcasting thoroughly** when adding new engine types
38//! - **Document any changes** that affect the type-erasure behavior
39//!
40//! ## Usage Example
41//!
42//! ```rust,ignore
43//! use std::collections::HashMap;
44//! use std::sync::Arc;
45//! use crate::engine::{AsyncEngine, AsAnyAsyncEngine, DowncastAnyAsyncEngine};
46//!
47//! // Create typed engines
48//! let string_engine: Arc<dyn AsyncEngine<String, String, ()>> = Arc::new(MyStringEngine::new());
49//! let int_engine: Arc<dyn AsyncEngine<i32, i32, ()>> = Arc::new(MyIntEngine::new());
50//!
51//! // Store in heterogeneous collection
52//! let mut engines: HashMap<String, Arc<dyn AnyAsyncEngine>> = HashMap::new();
53//! engines.insert("string".to_string(), string_engine.into_any_engine());
54//! engines.insert("int".to_string(), int_engine.into_any_engine());
55//!
56//! // Retrieve and downcast safely
57//! if let Some(typed_engine) = engines.get("string").unwrap().downcast::<String, String, ()>() {
58//! let result = typed_engine.generate("hello".to_string()).await;
59//! }
60//! ```
61
62use std::{
63 any::{Any, TypeId},
64 fmt::Debug,
65 future::Future,
66 marker::PhantomData,
67 pin::Pin,
68 sync::Arc,
69};
70
71pub use async_trait::async_trait;
72use futures::stream::Stream;
73
74/// All [`Send`] + [`Sync`] + `'static` types can be used as [`AsyncEngine`] request and response types.
75///
76/// This is implemented as a blanket implementation for all types that meet the bounds.
77/// **Do not manually implement this trait** - the blanket implementation covers all valid types.
78pub trait Data: Send + Sync + 'static {}
79impl<T: Send + Sync + 'static> Data for T {}
80
81/// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`]
82/// by associating it with a [`AsyncEngineContext`].
83pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send>>;
84pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
85
86pub type Engine<Req, Resp, E> = Arc<dyn AsyncEngine<Req, Resp, E>>;
87pub type EngineUnary<Resp> = Pin<Box<dyn AsyncEngineUnary<Resp>>>;
88pub type EngineStream<Resp> = Pin<Box<dyn AsyncEngineStream<Resp>>>;
89pub type Context = Arc<dyn AsyncEngineContext>;
90
91impl<T: Data> From<EngineStream<T>> for DataStream<T> {
92 fn from(stream: EngineStream<T>) -> Self {
93 Box::pin(stream)
94 }
95}
96
97// The Controller and the Context when https://github.com/rust-lang/rust/issues/65991 becomes stable
98pub trait AsyncEngineController: Send + Sync {}
99
100/// The [`AsyncEngineContext`] trait defines the interface to control the resulting stream
101/// produced by the engine.
102///
103/// This trait provides lifecycle management for async operations, including:
104/// - Stream identification via unique IDs
105/// - Graceful shutdown capabilities (`stop_generating`)
106/// - Immediate termination capabilities (`kill`)
107/// - Status checking for stopped/killed states
108///
109/// Implementations should ensure thread-safety and proper state management
110/// across concurrent access patterns.
111#[async_trait]
112pub trait AsyncEngineContext: Send + Sync + Debug {
113 /// Unique ID for the Stream
114 fn id(&self) -> &str;
115
116 /// Returns true if `stop_generating()` has been called; otherwise, false.
117 fn is_stopped(&self) -> bool;
118
119 /// Returns true if `kill()` has been called; otherwise, false.
120 /// This can be used with a `.take_while()` stream combinator to immediately terminate
121 /// the stream.
122 ///
123 /// An ideal location for a `[.take_while(!ctx.is_killed())]` stream combinator is on
124 /// the most downstream return stream.
125 fn is_killed(&self) -> bool;
126
127 /// Calling this method when [`AsyncEngineContext::is_stopped`] is `true` will return
128 /// immediately; otherwise, it will [`AsyncEngineContext::is_stopped`] will return true.
129 async fn stopped(&self);
130
131 /// Calling this method when [`AsyncEngineContext::is_killed`] is `true` will return
132 /// immediately; otherwise, it will [`AsyncEngineContext::is_killed`] will return true.
133 async fn killed(&self);
134
135 // Controller
136
137 /// Informs the [`AsyncEngine`] to stop producing results for this particular stream.
138 /// This method is idempotent. This method does not invalidate results current in the
139 /// stream. It might take some time for the engine to stop producing results. The caller
140 /// can decided to drain the stream or drop the stream.
141 fn stop_generating(&self);
142
143 /// See [`AsyncEngineContext::stop_generating`].
144 fn stop(&self);
145
146 /// Extends the [`AsyncEngineContext::stop_generating`] also indicates a preference to
147 /// terminate without draining the remaining items in the stream. This is implementation
148 /// specific and may not be supported by all engines.
149 fn kill(&self);
150
151 /// Links child AsyncEngineContext to this AsyncEngineContext. If the `stop_generating`, `stop`
152 /// or `kill` on this AsyncEngineContext is called, the same method is called on all linked
153 /// child AsyncEngineContext, in the order they are linked, and then the method on this
154 /// AsyncEngineContext continues.
155 fn link_child(&self, child: Arc<dyn AsyncEngineContext>);
156}
157
158/// Provides access to the [`AsyncEngineContext`] associated with an engine operation.
159///
160/// This trait is implemented by both unary and streaming engine results, allowing
161/// uniform access to context information regardless of the operation type.
162pub trait AsyncEngineContextProvider: Send + Debug {
163 fn context(&self) -> Arc<dyn AsyncEngineContext>;
164}
165
166/// A unary (single-response) asynchronous engine operation.
167///
168/// This trait combines `Future` semantics with context provider capabilities,
169/// representing a single async operation that produces one result.
170pub trait AsyncEngineUnary<Resp: Data>:
171 Future<Output = Resp> + AsyncEngineContextProvider + Send
172{
173}
174
175/// A streaming asynchronous engine operation.
176///
177/// This trait combines `Stream` semantics with context provider capabilities,
178/// representing a continuous async operation that produces multiple results over time.
179pub trait AsyncEngineStream<Resp: Data>:
180 Stream<Item = Resp> + AsyncEngineContextProvider + Send
181{
182}
183
184/// Engine is a trait that defines the interface for a streaming engine.
185/// The synchronous Engine version is does not need to be awaited.
186///
187/// This is the core trait for all async engine implementations. It provides:
188/// - Generic type parameters for request, response, and error types
189/// - Async generation capabilities with proper error handling
190/// - Thread-safe design with `Send + Sync` bounds
191///
192/// ## Type Parameters
193/// - `Req`: The request type that implements `Data`
194/// - `Resp`: The response type that implements both `Data` and `AsyncEngineContextProvider`
195/// - `E`: The error type that implements `Data`
196///
197/// ## Implementation Notes
198/// Implementations should ensure proper error handling and resource management.
199/// The `generate` method should be cancellable via the response's context provider.
200#[async_trait]
201pub trait AsyncEngine<Req: Send + Sync + 'static, Resp: AsyncEngineContextProvider, E: Data>:
202 Send + Sync
203{
204 /// Generate a stream of completion responses.
205 async fn generate(&self, request: Req) -> Result<Resp, E>;
206}
207
208/// Adapter for a [`DataStream`] to a [`ResponseStream`].
209///
210/// A common pattern is to consume the [`ResponseStream`] with standard stream combinators
211/// which produces a [`DataStream`] stream, then form a [`ResponseStream`] by propagating the
212/// original [`AsyncEngineContext`].
213pub struct ResponseStream<R: Data> {
214 stream: DataStream<R>,
215 ctx: Arc<dyn AsyncEngineContext>,
216}
217
218impl<R: Data> ResponseStream<R> {
219 pub fn new(stream: DataStream<R>, ctx: Arc<dyn AsyncEngineContext>) -> Pin<Box<Self>> {
220 Box::pin(Self { stream, ctx })
221 }
222}
223
224impl<R: Data> Stream for ResponseStream<R> {
225 type Item = R;
226
227 #[inline]
228 fn poll_next(
229 mut self: Pin<&mut Self>,
230 cx: &mut std::task::Context<'_>,
231 ) -> std::task::Poll<Option<Self::Item>> {
232 Pin::new(&mut self.stream).poll_next(cx)
233 }
234}
235
236impl<R: Data> AsyncEngineStream<R> for ResponseStream<R> {}
237
238impl<R: Data> AsyncEngineContextProvider for ResponseStream<R> {
239 fn context(&self) -> Arc<dyn AsyncEngineContext> {
240 self.ctx.clone()
241 }
242}
243
244impl<R: Data> Debug for ResponseStream<R> {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 f.debug_struct("ResponseStream")
247 // todo: add debug for stream - possibly propagate some information about what
248 // engine created the stream
249 // .field("stream", &self.stream)
250 .field("ctx", &self.ctx)
251 .finish()
252 }
253}
254
255impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineUnary<T>>> {
256 fn context(&self) -> Arc<dyn AsyncEngineContext> {
257 AsyncEngineContextProvider::context(&**self)
258 }
259}
260
261impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineStream<T>>> {
262 fn context(&self) -> Arc<dyn AsyncEngineContext> {
263 AsyncEngineContextProvider::context(&**self)
264 }
265}
266
267/// A type-erased `AsyncEngine`.
268///
269/// This trait enables storing heterogeneous `AsyncEngine` implementations in collections
270/// by erasing their specific generic type parameters. It provides runtime type information
271/// and safe downcasting capabilities.
272///
273/// ## Type Erasure Mechanism
274/// The trait uses `std::any::TypeId` to preserve type information at runtime, allowing
275/// safe downcasting back to the original `AsyncEngine<Req, Resp, E>` types.
276///
277/// ## Safety Guarantees
278/// - Type IDs are preserved exactly as they were during type erasure
279/// - Downcasting is only possible to the original type combination
280/// - Incorrect downcasts return `None` rather than panicking
281///
282/// ## Implementation Notes
283/// This trait is implemented by the internal `AnyEngineWrapper` struct. Users should
284/// not implement this trait directly - use the `AsAnyAsyncEngine` extension trait instead.
285pub trait AnyAsyncEngine: Send + Sync {
286 /// Returns the `TypeId` of the request type used by this engine.
287 fn request_type_id(&self) -> TypeId;
288
289 /// Returns the `TypeId` of the response type used by this engine.
290 fn response_type_id(&self) -> TypeId;
291
292 /// Returns the `TypeId` of the error type used by this engine.
293 fn error_type_id(&self) -> TypeId;
294
295 /// Provides access to the underlying engine as a `dyn Any` for downcasting.
296 fn as_any(&self) -> &dyn Any;
297}
298
299/// An internal wrapper to hold a typed `AsyncEngine` behind the `AnyAsyncEngine` trait object.
300///
301/// This struct uses `PhantomData<fn(Req, Resp, E)>` to maintain the type relationship
302/// without storing the types directly, enabling the type-erasure mechanism.
303///
304/// ## PhantomData Usage
305/// The `PhantomData<fn(Req, Resp, E)>` ensures that the compiler knows about the
306/// generic type parameters without requiring them to be `'static`, which would
307/// prevent storing non-static types in the engine.
308struct AnyEngineWrapper<Req, Resp, E>
309where
310 Req: Data,
311 Resp: Data + AsyncEngineContextProvider,
312 E: Data,
313{
314 engine: Arc<dyn AsyncEngine<Req, Resp, E>>,
315 _phantom: PhantomData<fn(Req, Resp, E)>,
316}
317
318impl<Req, Resp, E> AnyAsyncEngine for AnyEngineWrapper<Req, Resp, E>
319where
320 Req: Data,
321 Resp: Data + AsyncEngineContextProvider,
322 E: Data,
323{
324 fn request_type_id(&self) -> TypeId {
325 TypeId::of::<Req>()
326 }
327
328 fn response_type_id(&self) -> TypeId {
329 TypeId::of::<Resp>()
330 }
331
332 fn error_type_id(&self) -> TypeId {
333 TypeId::of::<E>()
334 }
335
336 fn as_any(&self) -> &dyn Any {
337 &self.engine
338 }
339}
340
341/// An extension trait that provides a convenient way to type-erase an `AsyncEngine`.
342///
343/// This trait provides the `.into_any_engine()` method on any `Arc<dyn AsyncEngine<...>>`,
344/// enabling ergonomic type erasure without explicit wrapper construction.
345///
346/// ## Usage
347/// ```rust,ignore
348/// use crate::engine::AsAnyAsyncEngine;
349///
350/// let typed_engine: Arc<dyn AsyncEngine<String, String, ()>> = Arc::new(MyEngine::new());
351/// let any_engine = typed_engine.into_any_engine();
352/// ```
353pub trait AsAnyAsyncEngine {
354 /// Converts a typed `AsyncEngine` into a type-erased `AnyAsyncEngine`.
355 fn into_any_engine(self) -> Arc<dyn AnyAsyncEngine>;
356}
357
358impl<Req, Resp, E> AsAnyAsyncEngine for Arc<dyn AsyncEngine<Req, Resp, E>>
359where
360 Req: Data,
361 Resp: Data + AsyncEngineContextProvider,
362 E: Data,
363{
364 fn into_any_engine(self) -> Arc<dyn AnyAsyncEngine> {
365 Arc::new(AnyEngineWrapper {
366 engine: self,
367 _phantom: PhantomData,
368 })
369 }
370}
371
372/// An extension trait that provides a convenient method to downcast an `AnyAsyncEngine`.
373///
374/// This trait provides the `.downcast<Req, Resp, E>()` method on `Arc<dyn AnyAsyncEngine>`,
375/// enabling safe downcasting back to the original typed engine.
376///
377/// ## Safety
378/// The downcast method performs runtime type checking using `TypeId` comparison.
379/// It will only succeed if the type parameters exactly match the original engine's types.
380///
381/// ## Usage
382/// ```rust,ignore
383/// use crate::engine::DowncastAnyAsyncEngine;
384///
385/// let any_engine: Arc<dyn AnyAsyncEngine> = // ... from collection
386/// if let Some(typed_engine) = any_engine.downcast::<String, String, ()>() {
387/// // Use the typed engine
388/// let result = typed_engine.generate("hello".to_string()).await;
389/// }
390/// ```
391pub trait DowncastAnyAsyncEngine {
392 /// Attempts to downcast an `AnyAsyncEngine` to a specific `AsyncEngine` type.
393 ///
394 /// Returns `Some(engine)` if the type parameters match the original engine,
395 /// or `None` if the types don't match.
396 fn downcast<Req, Resp, E>(&self) -> Option<Arc<dyn AsyncEngine<Req, Resp, E>>>
397 where
398 Req: Data,
399 Resp: Data + AsyncEngineContextProvider,
400 E: Data;
401}
402
403impl DowncastAnyAsyncEngine for Arc<dyn AnyAsyncEngine> {
404 fn downcast<Req, Resp, E>(&self) -> Option<Arc<dyn AsyncEngine<Req, Resp, E>>>
405 where
406 Req: Data,
407 Resp: Data + AsyncEngineContextProvider,
408 E: Data,
409 {
410 if self.request_type_id() == TypeId::of::<Req>()
411 && self.response_type_id() == TypeId::of::<Resp>()
412 && self.error_type_id() == TypeId::of::<E>()
413 {
414 self.as_any()
415 .downcast_ref::<Arc<dyn AsyncEngine<Req, Resp, E>>>()
416 .cloned()
417 } else {
418 None
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use std::collections::HashMap;
427
428 // 1. Define mock data structures
429 #[derive(Debug, PartialEq)]
430 struct Req1(String);
431
432 #[derive(Debug, PartialEq)]
433 struct Resp1(String);
434
435 // Dummy context provider implementation for the response
436 impl AsyncEngineContextProvider for Resp1 {
437 fn context(&self) -> Arc<dyn AsyncEngineContext> {
438 // For this test, we don't need a real context.
439 unimplemented!()
440 }
441 }
442
443 #[derive(Debug)]
444 struct Err1;
445
446 // A different set of types for testing failure cases
447 #[derive(Debug)]
448 struct Req2;
449 #[derive(Debug)]
450 struct Resp2;
451 impl AsyncEngineContextProvider for Resp2 {
452 fn context(&self) -> Arc<dyn AsyncEngineContext> {
453 unimplemented!()
454 }
455 }
456
457 // 2. Define a mock engine
458 struct MockEngine;
459
460 #[async_trait]
461 impl AsyncEngine<Req1, Resp1, Err1> for MockEngine {
462 async fn generate(&self, request: Req1) -> Result<Resp1, Err1> {
463 Ok(Resp1(format!("response to {}", request.0)))
464 }
465 }
466
467 #[tokio::test]
468 async fn test_engine_type_erasure_and_downcast() {
469 // 3. Create a typed engine
470 let typed_engine: Arc<dyn AsyncEngine<Req1, Resp1, Err1>> = Arc::new(MockEngine);
471
472 // 4. Use the extension trait to erase the type
473 let any_engine = typed_engine.into_any_engine();
474
475 // Check type IDs are preserved
476 assert_eq!(any_engine.request_type_id(), TypeId::of::<Req1>());
477 assert_eq!(any_engine.response_type_id(), TypeId::of::<Resp1>());
478 assert_eq!(any_engine.error_type_id(), TypeId::of::<Err1>());
479
480 // 5. Use the new downcast method on the Arc
481 let downcasted_engine = any_engine.downcast::<Req1, Resp1, Err1>();
482
483 // 6. Assert success
484 assert!(downcasted_engine.is_some());
485
486 // We can even use the downcasted engine
487 let response = downcasted_engine
488 .unwrap()
489 .generate(Req1("hello".to_string()))
490 .await;
491 assert_eq!(response.unwrap(), Resp1("response to hello".to_string()));
492
493 // 7. Assert failure for wrong types
494 let failed_downcast = any_engine.downcast::<Req2, Resp2, Err1>();
495 assert!(failed_downcast.is_none());
496
497 // 8. HashMap usage test
498 let mut engine_map: HashMap<String, Arc<dyn AnyAsyncEngine>> = HashMap::new();
499 engine_map.insert("mock".to_string(), any_engine);
500
501 let retrieved_engine = engine_map.get("mock").unwrap();
502 let final_engine = retrieved_engine.downcast::<Req1, Resp1, Err1>().unwrap();
503 let final_response = final_engine.generate(Req1("world".to_string())).await;
504 assert_eq!(
505 final_response.unwrap(),
506 Resp1("response to world".to_string())
507 );
508 }
509}