use async_trait::async_trait;
use crate::error::Result;
use crate::outputs::Generation;
use crate::runnables::RunnableStream;
pub trait OutputParser: Send + Sync {
fn parse(&self, text: &str) -> Result<serde_json::Value>;
fn parse_result(&self, result: &[Generation], _partial: bool) -> Result<serde_json::Value> {
if result.is_empty() {
return Err(crate::error::CognisError::OutputParserError {
message: "No generations to parse".into(),
observation: None,
llm_output: None,
});
}
self.parse(&result[0].text)
}
fn get_format_instructions(&self) -> Option<String> {
None
}
fn parser_type(&self) -> &str;
}
#[async_trait]
pub trait TransformOutputParser: OutputParser {
async fn transform(&self, input: RunnableStream) -> Result<RunnableStream>;
}
pub async fn default_transform_stream(
parser: &dyn OutputParser,
input: RunnableStream,
) -> Result<RunnableStream> {
use futures::StreamExt;
let parser_type = parser.parser_type().to_string();
let mut chunks = Vec::new();
let mut stream = input;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
chunks.push(match &chunk {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
});
}
let accumulated = chunks.join("");
let parsed = parser.parse(&accumulated).map_err(|e| {
crate::error::CognisError::OutputParserError {
message: format!("{} transform failed: {}", parser_type, e),
observation: Some(accumulated),
llm_output: None,
}
})?;
Ok(Box::pin(futures::stream::once(async { Ok(parsed) })))
}