Skip to main content

ai_chain/output/
mod.rs

1mod stream;
2
3use core::fmt;
4
5use crate::{prompt::Data, traits::ExecutorError};
6use thiserror;
7use tokio::sync::mpsc;
8
9pub use stream::{OutputStream, StreamSegment};
10pub use tokio_stream::{Stream, StreamExt};
11
12/// The `Output` enum provides a general interface for outputs of different types.
13/// The `Immediate` variant represents data that is immediately available, while the `Stream` variant
14/// represents data that may be produced over time.
15pub enum Output {
16    /// Represents immediately available data.
17    Immediate(Immediate),
18
19    /// Represents data that is produced over time.
20    Stream(OutputStream),
21}
22
23#[derive(Debug, thiserror::Error)]
24#[error("Trying to return a stream on an Immediate output")]
25pub struct NotAStreamError;
26
27impl Output {
28    /// Converts the `Output` to its `Immediate` form.
29    /// If the output is `Stream`, it will be consumed and turned into an `Immediate` output.
30    /// This operation is asynchronous as it may need to wait for all data to be produced in the case of a `Stream`.
31    pub async fn to_immediate(self) -> Result<Immediate, ExecutorError> {
32        match self {
33            Output::Immediate(x) => Ok(x),
34            Output::Stream(x) => Ok(Immediate(x.into_data().await?)),
35        }
36    }
37
38    /// Given that the Output is a stream, return a OutputStream
39    /// If the output is `Immediate` NotAStreamError will be raised
40    pub async fn as_stream(self) -> Result<OutputStream, NotAStreamError> {
41        match self {
42            Output::Immediate(_) => Err(NotAStreamError),
43            Output::Stream(x) => Ok(x),
44        }
45    }
46
47    /// Creates a new `Stream` output along with a sender to produce data.
48    pub fn new_stream() -> (mpsc::UnboundedSender<StreamSegment>, Self) {
49        let (sender, stream) = OutputStream::new();
50
51        (sender, Output::Stream(stream))
52    }
53
54    pub fn from_stream<S>(stream: S) -> Self
55    where
56        S: Stream<Item = StreamSegment> + Send + 'static,
57    {
58        Output::Stream(OutputStream::from_stream(stream))
59    }
60
61    /// Creates a new `Immediate` output from the given data.
62    pub fn new_immediate(data: Data<String>) -> Self {
63        Output::Immediate(Immediate(data))
64    }
65}
66
67impl fmt::Display for Output {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        match self {
70            Output::Immediate(Immediate(data)) => data.fmt(f),
71            Output::Stream(_) => write!(f, "<OutputStream>"),
72        }
73    }
74}
75
76pub struct Immediate(Data<String>);
77
78impl Immediate {
79    /// Returns a reference to the content if it is immediately available.
80    pub fn get_content(&self) -> &Data<String> {
81        &self.0
82    }
83
84    pub fn as_content(self) -> Data<String> {
85        self.0
86    }
87
88    pub fn primary_textual_output(&self) -> Option<String> {
89        self.get_content().extract_last_body().cloned()
90    }
91}
92
93impl fmt::Display for Immediate {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        self.0.fmt(f)
96    }
97}