dynamo_runtime/engine.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc};
17
18pub use async_trait::async_trait;
19use futures::stream::Stream;
20
21/// All [`Send`] + [`Sync`] + `'static` types can be used as [`AsyncEngine`] request and response types.
22pub trait Data: Send + Sync + 'static {}
23impl<T: Send + Sync + 'static> Data for T {}
24
25/// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`]
26/// by associating it with a [`AsyncEngineContext`].
27pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
28pub type DataStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
29
30pub type Engine<Req, Resp, E> = Arc<dyn AsyncEngine<Req, Resp, E>>;
31pub type EngineUnary<Resp> = Pin<Box<dyn AsyncEngineUnary<Resp>>>;
32pub type EngineStream<Resp> = Pin<Box<dyn AsyncEngineStream<Resp>>>;
33pub type Context = Arc<dyn AsyncEngineContext>;
34
35impl<T: Data> From<EngineStream<T>> for DataStream<T> {
36 fn from(stream: EngineStream<T>) -> Self {
37 Box::pin(stream)
38 }
39}
40
41// The Controller and the Context when https://github.com/rust-lang/rust/issues/65991 becomes stable
42pub trait AsyncEngineController: Send + Sync {}
43
44/// The [`AsyncEngineContext`] trait defines the interface to control the resulting stream
45/// produced by the engine.
46#[async_trait]
47pub trait AsyncEngineContext: Send + Sync + Debug {
48 /// Unique ID for the Stream
49 fn id(&self) -> &str;
50
51 /// Returns true if `stop_generating()` has been called; otherwise, false.
52 fn is_stopped(&self) -> bool;
53
54 /// Returns true if `kill()` has been called; otherwise, false.
55 /// This can be used with a `.take_while()` stream combinator to immediately terminate
56 /// the stream.
57 ///
58 /// An ideal location for a `[.take_while(!ctx.is_killed())]` stream combinator is on
59 /// the most downstream return stream.
60 fn is_killed(&self) -> bool;
61
62 /// Calling this method when [`AsyncEngineContext::is_stopped`] is `true` will return
63 /// immediately; otherwise, it will [`AsyncEngineContext::is_stopped`] will return true.
64 async fn stopped(&self);
65
66 /// Calling this method when [`AsyncEngineContext::is_killed`] is `true` will return
67 /// immediately; otherwise, it will [`AsyncEngineContext::is_killed`] will return true.
68 async fn killed(&self);
69
70 // Controller
71
72 /// Informs the [`AsyncEngine`] to stop producing results for this particular stream.
73 /// This method is idempotent. This method does not invalidate results current in the
74 /// stream. It might take some time for the engine to stop producing results. The caller
75 /// can decided to drain the stream or drop the stream.
76 fn stop_generating(&self);
77
78 /// See [`AsyncEngineContext::stop_generating`].
79 fn stop(&self);
80
81 /// Extends the [`AsyncEngineContext::stop_generating`] also indicates a preference to
82 /// terminate without draining the remaining items in the stream. This is implementation
83 /// specific and may not be supported by all engines.
84 fn kill(&self);
85}
86
87pub trait AsyncEngineContextProvider: Send + Sync + Debug {
88 fn context(&self) -> Arc<dyn AsyncEngineContext>;
89}
90
91pub trait AsyncEngineUnary<Resp: Data>:
92 Future<Output = Resp> + AsyncEngineContextProvider + Send + Sync
93{
94}
95
96pub trait AsyncEngineStream<Resp: Data>:
97 Stream<Item = Resp> + AsyncEngineContextProvider + Send + Sync
98{
99}
100
101/// Engine is a trait that defines the interface for a steaming LLM completion engine.
102/// The synchronous Engine version is does not need to be awaited.
103#[async_trait]
104pub trait AsyncEngine<Req: Data, Resp: Data + AsyncEngineContextProvider, E: Data>:
105 Send + Sync
106{
107 /// Generate a stream of completion responses.
108 async fn generate(&self, request: Req) -> Result<Resp, E>;
109}
110
111/// Adapter for a [`DataStream`] to a [`ResponseStream`].
112///
113/// A common pattern is to consume the [`ResponseStream`] with standard stream combinators
114/// which produces a [`DataStream`] stream, then form a [`ResponseStream`] by propagating the
115/// original [`AsyncEngineContext`].
116pub struct ResponseStream<R: Data> {
117 stream: DataStream<R>,
118 ctx: Arc<dyn AsyncEngineContext>,
119}
120
121impl<R: Data> ResponseStream<R> {
122 pub fn new(stream: DataStream<R>, ctx: Arc<dyn AsyncEngineContext>) -> Pin<Box<Self>> {
123 Box::pin(Self { stream, ctx })
124 }
125}
126
127impl<R: Data> Stream for ResponseStream<R> {
128 type Item = R;
129
130 #[inline]
131 fn poll_next(
132 mut self: Pin<&mut Self>,
133 cx: &mut std::task::Context<'_>,
134 ) -> std::task::Poll<Option<Self::Item>> {
135 Pin::new(&mut self.stream).poll_next(cx)
136 }
137}
138
139impl<R: Data> AsyncEngineStream<R> for ResponseStream<R> {}
140
141impl<R: Data> AsyncEngineContextProvider for ResponseStream<R> {
142 fn context(&self) -> Arc<dyn AsyncEngineContext> {
143 self.ctx.clone()
144 }
145}
146
147impl<R: Data> Debug for ResponseStream<R> {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("ResponseStream")
150 // todo: add debug for stream - possibly propagate some information about what
151 // engine created the stream
152 // .field("stream", &self.stream)
153 .field("ctx", &self.ctx)
154 .finish()
155 }
156}
157
158impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineUnary<T>>> {
159 fn context(&self) -> Arc<dyn AsyncEngineContext> {
160 AsyncEngineContextProvider::context(&**self)
161 }
162}
163
164impl<T: Data> AsyncEngineContextProvider for Pin<Box<dyn AsyncEngineStream<T>>> {
165 fn context(&self) -> Arc<dyn AsyncEngineContext> {
166 AsyncEngineContextProvider::context(&**self)
167 }
168}