dynamo_runtime/
engine.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc};
17
18pub use async_trait::async_trait;
19use futures::stream::Stream;
20
21/// All [`Send`] + [`Sync`] + `'static` types can be used as [`AsyncEngine`] request and response types.
22pub trait Data: Send + Sync + 'static {}
23impl<T: Send + Sync + 'static> Data for T {}
24
25/// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`]
26/// by associating it with a [`AsyncEngineContext`].
27pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
28pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
29
30pub type Engine<Req, Resp, E> = Arc<dyn AsyncEngine<Req, Resp, E>>;
31pub type EngineUnary<Resp> = Pin<Box<dyn AsyncEngineUnary<Resp>>>;
32pub type EngineStream<Resp> = Pin<Box<dyn AsyncEngineStream<Resp>>>;
33pub type Context = Arc<dyn AsyncEngineContext>;
34
35impl<T: Data> From<EngineStream<T>> for DataStream<T> {
36    fn from(stream: EngineStream<T>) -> Self {
37        Box::pin(stream)
38    }
39}
40
41// The Controller and the Context when https://github.com/rust-lang/rust/issues/65991 becomes stable
42pub trait AsyncEngineController: Send + Sync {}
43
44/// The [`AsyncEngineContext`] trait defines the interface to control the resulting stream
45/// produced by the engine.
46#[async_trait]
47pub trait AsyncEngineContext: Send + Sync + Debug {
48    /// Unique ID for the Stream
49    fn id(&self) -> &str;
50
51    /// Returns true if `stop_generating()` has been called; otherwise, false.
52    fn is_stopped(&self) -> bool;
53
54    /// Returns true if `kill()` has been called; otherwise, false.
55    /// This can be used with a `.take_while()` stream combinator to immediately terminate
56    /// the stream.
57    ///
58    /// An ideal location for a `[.take_while(!ctx.is_killed())]` stream combinator is on
59    /// the most downstream  return stream.
60    fn is_killed(&self) -> bool;
61
62    /// Calling this method when [`AsyncEngineContext::is_stopped`] is `true` will return
63    /// immediately; otherwise, it will [`AsyncEngineContext::is_stopped`] will return true.
64    async fn stopped(&self);
65
66    /// Calling this method when [`AsyncEngineContext::is_killed`] is `true` will return
67    /// immediately; otherwise, it will [`AsyncEngineContext::is_killed`] will return true.
68    async fn killed(&self);
69
70    // Controller
71
72    /// Informs the [`AsyncEngine`] to stop producing results for this particular stream.
73    /// This method is idempotent. This method does not invalidate results current in the
74    /// stream. It might take some time for the engine to stop producing results. The caller
75    /// can decided to drain the stream or drop the stream.
76    fn stop_generating(&self);
77
78    /// See [`AsyncEngineContext::stop_generating`].
79    fn stop(&self);
80
81    /// Extends the [`AsyncEngineContext::stop_generating`] also indicates a preference to
82    /// terminate without draining the remaining items in the stream. This is implementation
83    /// specific and may not be supported by all engines.
84    fn kill(&self);
85}
86
87pub trait AsyncEngineContextProvider: Send + Sync + Debug {
88    fn context(&self) -> Arc<dyn AsyncEngineContext>;
89}
90
91pub trait AsyncEngineUnary<Resp: Data>:
92    Future<Output = Resp> + AsyncEngineContextProvider + Send + Sync
93{
94}
95
96pub trait AsyncEngineStream<Resp: Data>:
97    Stream<Item = Resp> + AsyncEngineContextProvider + Send + Sync
98{
99}
100
101/// Engine is a trait that defines the interface for a steaming LLM completion engine.
102/// The synchronous Engine version is does not need to be awaited.
103#[async_trait]
104pub trait AsyncEngine<Req: Data, Resp: Data + AsyncEngineContextProvider, E: Data>:
105    Send + Sync
106{
107    /// Generate a stream of completion responses.
108    async fn generate(&self, request: Req) -> Result<Resp, E>;
109}
110
111/// Adapter for a [`DataStream`] to a [`ResponseStream`].
112///
113/// A common pattern is to consume the [`ResponseStream`] with standard stream combinators
114/// which produces a [`DataStream`] stream, then form a [`ResponseStream`] by propagating the
115/// original [`AsyncEngineContext`].
116pub struct ResponseStream<R: Data> {
117    stream: DataStream<R>,
118    ctx: Arc<dyn AsyncEngineContext>,
119}
120
121impl<R: Data> ResponseStream<R> {
122    pub fn new(stream: DataStream<R>, ctx: Arc<dyn AsyncEngineContext>) -> Pin<Box<Self>> {
123        Box::pin(Self { stream, ctx })
124    }
125}
126
127impl<R: Data> Stream for ResponseStream<R> {
128    type Item = R;
129
130    #[inline]
131    fn poll_next(
132        mut self: Pin<&mut Self>,
133        cx: &mut std::task::Context<'_>,
134    ) -> std::task::Poll<Option<Self::Item>> {
135        Pin::new(&mut self.stream).poll_next(cx)
136    }
137}
138
139impl<R: Data> AsyncEngineStream<R> for ResponseStream<R> {}
140
141impl<R: Data> AsyncEngineContextProvider for ResponseStream<R> {
142    fn context(&self) -> Arc<dyn AsyncEngineContext> {
143        self.ctx.clone()
144    }
145}
146
147impl<R: Data> Debug for ResponseStream<R> {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("ResponseStream")
150            // todo: add debug for stream - possibly propagate some information about what
151            // engine created the stream
152            // .field("stream", &self.stream)
153            .field("ctx", &self.ctx)
154            .finish()
155    }
156}
157
158impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineUnary<T>>> {
159    fn context(&self) -> Arc<dyn AsyncEngineContext> {
160        AsyncEngineContextProvider::context(&**self)
161    }
162}
163
164impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineStream<T>>> {
165    fn context(&self) -> Arc<dyn AsyncEngineContext> {
166        AsyncEngineContextProvider::context(&**self)
167    }
168}