1mod stream;
2
3use core::fmt;
4
5use crate::{prompt::Data, traits::ExecutorError};
6use thiserror;
7use tokio::sync::mpsc;
8
9pub use stream::{OutputStream, StreamSegment};
10pub use tokio_stream::{Stream, StreamExt};
11
12pub enum Output {
16 Immediate(Immediate),
18
19 Stream(OutputStream),
21}
22
23#[derive(Debug, thiserror::Error)]
24#[error("Trying to return a stream on an Immediate output")]
25pub struct NotAStreamError;
26
27impl Output {
28 pub async fn to_immediate(self) -> Result<Immediate, ExecutorError> {
32 match self {
33 Output::Immediate(x) => Ok(x),
34 Output::Stream(x) => Ok(Immediate(x.into_data().await?)),
35 }
36 }
37
38 pub async fn as_stream(self) -> Result<OutputStream, NotAStreamError> {
41 match self {
42 Output::Immediate(_) => Err(NotAStreamError),
43 Output::Stream(x) => Ok(x),
44 }
45 }
46
47 pub fn new_stream() -> (mpsc::UnboundedSender<StreamSegment>, Self) {
49 let (sender, stream) = OutputStream::new();
50
51 (sender, Output::Stream(stream))
52 }
53
54 pub fn from_stream<S>(stream: S) -> Self
55 where
56 S: Stream<Item = StreamSegment> + Send + 'static,
57 {
58 Output::Stream(OutputStream::from_stream(stream))
59 }
60
61 pub fn new_immediate(data: Data<String>) -> Self {
63 Output::Immediate(Immediate(data))
64 }
65}
66
67impl fmt::Display for Output {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 match self {
70 Output::Immediate(Immediate(data)) => data.fmt(f),
71 Output::Stream(_) => write!(f, "<OutputStream>"),
72 }
73 }
74}
75
76pub struct Immediate(Data<String>);
77
78impl Immediate {
79 pub fn get_content(&self) -> &Data<String> {
81 &self.0
82 }
83
84 pub fn as_content(self) -> Data<String> {
85 self.0
86 }
87
88 pub fn primary_textual_output(&self) -> Option<String> {
89 self.get_content().extract_last_body().cloned()
90 }
91}
92
93impl fmt::Display for Immediate {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 self.0.fmt(f)
96 }
97}