use std::{
env, fs,
io::{self, Write},
};
use claus::anthropic::{Content, Delta, Message, Role, StreamEvent, StreamingMessage};
use futures::stream::StreamExt;
use reqwest_eventsource::{Event, EventSource};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Config {
anthropic_api_key: String,
}
#[tokio::main]
async fn main() {
let config_file = env::args()
.nth(1)
.expect("requires argument: path to TOML config file");
let client = reqwest::Client::new();
let config_content = fs::read_to_string(&config_file).expect("failed to read config file");
let config: Config = toml::from_str(&config_content).expect("failed to parse config TOML");
let api = claus::Api::new(config.anthropic_api_key);
let mut messages = im::Vector::new();
while let Some(input) = read_next_line() {
messages.push_back(Message::from_text(Role::User, input));
let http_req = claus::MessagesRequestBuilder::new()
.set_messages(messages.clone())
.stream(true)
.build(&api);
let request_builder = http_req
.try_into_reqwest_builder(&client)
.expect("failed to create request builder");
let mut es = EventSource::new(request_builder).expect("failed to create event source");
let mut assistant_content = Vec::new();
let mut current_text = String::new();
let mut streaming_message: Option<StreamingMessage> = None;
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => {
}
Ok(Event::Message(message)) => {
match claus::deserialize_event(message.data.as_bytes()) {
Ok(stream_event) => match stream_event {
StreamEvent::MessageStart { message } => {
print!("Assistant: ");
io::stdout().flush().expect("failed to flush stdout");
streaming_message = Some(message);
}
StreamEvent::ContentBlockStart {
index: _,
content_block,
} => {
match content_block {
Content::Text { text } => {
print!("{}", text);
io::stdout().flush().expect("failed to flush stdout");
current_text.push_str(&text);
}
_ => {
eprintln!("Other content block: {:?}", content_block);
}
}
}
StreamEvent::ContentBlockDelta { index, delta } => {
match delta {
Delta::TextDelta { text } => {
print!("{}", text);
io::stdout().flush().expect("failed to flush stdout");
current_text.push_str(&text);
}
other_delta => {
eprintln!(
"Other delta for block {}: {:?}",
index, other_delta
);
}
}
}
StreamEvent::ContentBlockStop { index: _ } => {
if !current_text.is_empty() {
assistant_content
.push(Content::from_text(current_text.clone()));
current_text.clear();
}
}
StreamEvent::MessageDelta { delta, usage: _ } => {
if let Some(ref mut msg) = streaming_message {
msg.update(delta);
} else {
eprintln!(
"Received MessageDelta but no streaming message available"
);
}
}
StreamEvent::MessageStop => {
println!();
break;
}
StreamEvent::Ping => {
}
StreamEvent::Error { error } => {
eprintln!("Error event: {:?}", error);
break;
}
StreamEvent::Unknown {
event_type,
contents,
} => {
eprintln!(
"Unknown event type: {:?}, contents: {:?}",
String::from_utf8_lossy(&event_type),
contents
);
}
},
Err(parse_err) => {
eprintln!("Failed to parse event data: {}", parse_err);
eprintln!("Raw data: {}", message.data);
}
}
}
Err(err) => {
eprintln!("Error: {}", err);
break;
}
}
}
if !assistant_content.is_empty() {
let assistant_message = Message {
role: Role::Assistant,
content: assistant_content,
};
messages.push_back(assistant_message);
}
}
}
fn read_next_line() -> Option<String> {
let stdin = io::stdin();
let mut stdout = io::stdout();
let mut input = String::new();
stdout.write_all(b"You: ").expect("stdout failed to write");
stdout.flush().expect("stdout failed to flush");
if stdin.read_line(&mut input).expect("stdin failed to read") == 0 {
None
} else {
Some(input)
}
}