1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Closes the orphan-rule split for `ChatModel`: provides the `Runnable<
//! Vec<Message>, Message>` impl so chat models drop into `.pipe()`
//! chains and the streaming surface so `StreamMode::Messages` emits
//! real token-level deltas.
//!
//! `ChatModel` itself is defined in `entelix-core` (it depends only on
//! `Codec` + `Transport`). This module lives here because
//! `entelix-runnable` owns the `Runnable` trait — see
use entelix_core::chat::ChatModel;
use entelix_core::codecs::Codec;
use entelix_core::ir::{Message, Role};
use entelix_core::stream::StreamAggregator;
use entelix_core::transports::Transport;
use entelix_core::{ExecutionContext, Result};
use futures::StreamExt;
use crate::runnable::Runnable;
use crate::stream::{BoxStream, DebugEvent, RunnableEvent, StreamChunk, StreamMode};
#[async_trait::async_trait]
impl<C, T> Runnable<Vec<Message>, Message> for ChatModel<C, T>
where
C: Codec,
T: Transport,
{
async fn invoke(&self, input: Vec<Message>, ctx: &ExecutionContext) -> Result<Message> {
self.complete(input, ctx).await
}
#[allow(tail_expr_drop_order, clippy::too_many_lines)]
async fn stream(
&self,
input: Vec<Message>,
mode: StreamMode,
ctx: &ExecutionContext,
) -> Result<BoxStream<'_, Result<StreamChunk<Message>>>> {
let codec_name = self.codec().name().to_owned();
let model_stream = self.stream_deltas(input, ctx).await?;
// The `ModelStream` carries both a delta-side stream and a
// completion future already wired through `tap_aggregator`
// — but this `Runnable::stream` impl needs per-mode delta
// forwarding (`Messages` mode yields each delta, `Events`
// wraps the lifecycle, `Values` only yields the terminal
// aggregated `Message`). Drain the delta stream directly
// and let our own `StreamAggregator` reconstitute the
// final `ModelResponse` for `Values` mode; the underlying
// `completion` future resolves transparently as the same
// deltas flow through both taps.
let mut delta_stream = model_stream.stream;
Ok(Box::pin(async_stream::stream! {
if matches!(mode, StreamMode::Events) {
yield Ok(StreamChunk::Event(RunnableEvent::Started {
name: codec_name.clone(),
}));
}
let mut aggregator = StreamAggregator::new();
while let Some(item) = delta_stream.next().await {
match item {
Ok(delta) => {
if matches!(mode, StreamMode::Messages) {
yield Ok(StreamChunk::Message(delta.clone()));
}
if let Err(e) = aggregator.push(delta) {
if matches!(mode, StreamMode::Events) {
yield Ok(StreamChunk::Event(
RunnableEvent::Finished {
name: codec_name.clone(),
ok: false,
},
));
}
yield Err(e);
return;
}
}
Err(e) => {
if matches!(mode, StreamMode::Events) {
yield Ok(StreamChunk::Event(RunnableEvent::Finished {
name: codec_name.clone(),
ok: false,
}));
}
yield Err(e);
return;
}
}
}
let response = match aggregator.finalize() {
Ok(r) => r,
Err(e) => {
if matches!(mode, StreamMode::Events) {
yield Ok(StreamChunk::Event(RunnableEvent::Finished {
name: codec_name,
ok: false,
}));
}
yield Err(e);
return;
}
};
let assistant = Message::new(Role::Assistant, response.content);
match mode {
StreamMode::Updates => {
yield Ok(StreamChunk::Update {
node: codec_name,
value: assistant,
});
}
StreamMode::Values | StreamMode::Messages => {
yield Ok(StreamChunk::Value(assistant));
}
StreamMode::Debug => {
yield Ok(StreamChunk::Debug(DebugEvent::NodeStart {
node: codec_name.clone(),
step: 1,
}));
yield Ok(StreamChunk::Value(assistant));
yield Ok(StreamChunk::Debug(DebugEvent::NodeEnd {
node: codec_name,
step: 1,
}));
yield Ok(StreamChunk::Debug(DebugEvent::Final));
}
StreamMode::Events => {
yield Ok(StreamChunk::Value(assistant));
yield Ok(StreamChunk::Event(RunnableEvent::Finished {
name: codec_name,
ok: true,
}));
}
}
}))
}
}