#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.anthropic.com";
const ENV_KEY: &str = "ANTHROPIC_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("x-api-key", &credential);
Ok(builder)
}
#[actor(
AnthropicGenerateMessageActor,
inports::<100>(model, messages, max_tokens, temperature, tools),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_generate_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/messages".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messages") {
body.insert("messages".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_tokens") {
body.insert("max_tokens".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /v1/messages failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicCreateMessageActor,
inports::<100>(model, messages, max_tokens, system, stream, temperature, top_p, top_k, tools, tool_choice, stop_sequences, metadata, thinking, betas, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_create_message(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/messages".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("betas") {
builder = builder.header("betas", super::message_to_str(val));
}
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messages") {
body.insert("messages".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_tokens") {
body.insert("max_tokens".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("system") {
body.insert("system".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("stream") {
body.insert("stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("top_p") {
body.insert("top_p".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("top_k") {
body.insert("top_k".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tool_choice") {
body.insert("tool_choice".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("stop_sequences") {
body.insert("stop_sequences".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("thinking") {
body.insert("thinking".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /v1/messages failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicCreateMessageBatchActor,
inports::<100>(requests, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_create_message_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/messages/batches".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("requests") {
body.insert("requests".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /v1/messages/batches failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicListMessageBatchesActor,
inports::<100>(before_id, after_id, limit, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_list_message_batches(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/messages/batches".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("before_id") {
query_pairs.push(("before_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after_id") {
query_pairs.push(("after_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/messages/batches failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicReadMessageBatchActor,
inports::<100>(message_batch_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_read_message_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/messages/batches/{message_batch_id}".to_string();
if let Some(val) = inputs.get("message_batch_id") {
endpoint = endpoint.replace("{{message_batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /v1/messages/batches/{{message_batch_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AnthropicStopMessageBatchActor,
inports::<100>(message_batch_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_stop_message_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/messages/batches/{message_batch_id}/cancel".to_string();
if let Some(val) = inputs.get("message_batch_id") {
endpoint = endpoint.replace("{{message_batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /v1/messages/batches/{{message_batch_id}}/cancel failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AnthropicDownloadMessageBatchResultsActor,
inports::<100>(message_batch_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_download_message_batch_results(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/messages/batches/{message_batch_id}/results".to_string();
if let Some(val) = inputs.get("message_batch_id") {
endpoint = endpoint.replace("{{message_batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /v1/messages/batches/{{message_batch_id}}/results failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AnthropicDeleteMessageBatchActor,
inports::<100>(message_batch_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_delete_message_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/messages/batches/{message_batch_id}".to_string();
if let Some(val) = inputs.get("message_batch_id") {
endpoint = endpoint.replace("{{message_batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /v1/messages/batches/{{message_batch_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AnthropicListModelsActor,
inports::<100>(before_id, after_id, limit, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_list_models(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/models".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("before_id") {
query_pairs.push(("before_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after_id") {
query_pairs.push(("after_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/models failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicReadModelActor,
inports::<100>(model_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_read_model(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/models/{model_id}".to_string();
if let Some(val) = inputs.get("model_id") {
endpoint = endpoint.replace("{{model_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/models/{{model_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicUploadFileActor,
inports::<100>(file, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_upload_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/files".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file") {
body.insert("file".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /v1/files failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicListFilesActor,
inports::<100>(before_id, after_id, limit, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_list_files(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/files".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("before_id") {
query_pairs.push(("before_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after_id") {
query_pairs.push(("after_id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/files failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicReadFileActor,
inports::<100>(file_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_read_file(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/files/{file_id}".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/files/{{file_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicDeleteFileActor,
inports::<100>(file_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_delete_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/files/{file_id}".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /v1/files/{{file_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicDownloadFileActor,
inports::<100>(file_id, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_download_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/v1/files/{file_id}/content".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /v1/files/{{file_id}}/content failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AnthropicCreateTokenCountActor,
inports::<100>(model, messages, system, tools, thinking, anthropic_version),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn anthropic_create_token_count(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/v1/messages/count_tokens".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
if let Some(val) = inputs.get("anthropic_version") {
builder = builder.header("anthropic-version", super::message_to_str(val));
}
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messages") {
body.insert("messages".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("system") {
body.insert("system".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("thinking") {
body.insert("thinking".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /v1/messages/count_tokens failed: {}", e).into()),
);
}
}
Ok(output)
}