#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.openai.com/v1";
const ENV_KEY: &str = "OPENAI_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.header("Authorization", format!("Bearer {}", credential));
Ok(builder)
}
#[actor(
OpenaiGenerateCompletionActor,
inports::<100>(model, messages, temperature, max_tokens, tools, tool_choice),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_generate_completion(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/completions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messages") {
body.insert("messages".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_tokens") {
body.insert("max_tokens".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tool_choice") {
body.insert("tool_choice".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /chat/completions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiGenerateEmbeddingActor,
inports::<100>(model, input),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_generate_embedding(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/embeddings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("input") {
body.insert("input".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /embeddings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiGenerateImageActor,
inports::<100>(prompt, model, size, quality),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_generate_image(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/images/generations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("size") {
body.insert("size".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("quality") {
body.insert("quality".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /images/generations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateResponseActor,
inports::<100>(model, input, instructions, tools, stream, temperature, max_output_tokens, previous_response_id, truncation, background),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_response(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/responses".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("input") {
body.insert("input".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("instructions") {
body.insert("instructions".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("stream") {
body.insert("stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_output_tokens") {
body.insert("max_output_tokens".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("previous_response_id") {
body.insert("previous_response_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("truncation") {
body.insert("truncation".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("background") {
body.insert("background".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /responses failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadResponseActor,
inports::<100>(response_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_response(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/responses/{response_id}".to_string();
if let Some(val) = inputs.get("response_id") {
endpoint = endpoint.replace("{{response_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /responses/{{response_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteResponseActor,
inports::<100>(response_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_response(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/responses/{response_id}".to_string();
if let Some(val) = inputs.get("response_id") {
endpoint = endpoint.replace("{{response_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /responses/{{response_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListResponseInputItemsActor,
inports::<100>(response_id, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_response_input_items(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/responses/{response_id}/input_items".to_string();
if let Some(val) = inputs.get("response_id") {
endpoint = endpoint.replace("{{response_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /responses/{{response_id}}/input_items failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateChatCompletionActor,
inports::<100>(model, messages, temperature, max_tokens, stream, tools, tool_choice, response_format, n, top_p),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_chat_completion(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/completions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("messages") {
body.insert("messages".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("max_tokens") {
body.insert("max_tokens".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("stream") {
body.insert("stream".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tools") {
body.insert("tools".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("tool_choice") {
body.insert("tool_choice".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("response_format") {
body.insert("response_format".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("n") {
body.insert("n".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("top_p") {
body.insert("top_p".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /chat/completions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadChatCompletionActor,
inports::<100>(completion_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_chat_completion(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/chat/completions/{completion_id}".to_string();
if let Some(val) = inputs.get("completion_id") {
endpoint = endpoint.replace("{{completion_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /chat/completions/{{completion_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiListChatCompletionsActor,
inports::<100>(model, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_chat_completions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/chat/completions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("model") {
query_pairs.push(("model", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /chat/completions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateImageActor,
inports::<100>(prompt, model, n, size, quality, response_format),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_image(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/images/generations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("n") {
body.insert("n".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("size") {
body.insert("size".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("quality") {
body.insert("quality".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("response_format") {
body.insert("response_format".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /images/generations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateImageEditActor,
inports::<100>(image, prompt, mask, model, n, size),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_image_edit(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/images/edits".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("image") {
body.insert("image".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("mask") {
body.insert("mask".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("n") {
body.insert("n".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("size") {
body.insert("size".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /images/edits failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateImageVariationActor,
inports::<100>(image, n, size, model),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_image_variation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/images/variations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("image") {
body.insert("image".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("n") {
body.insert("n".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("size") {
body.insert("size".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /images/variations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateTranscriptionActor,
inports::<100>(file, model, language, prompt, response_format, temperature),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_transcription(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/audio/transcriptions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file") {
body.insert("file".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("language") {
body.insert("language".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("response_format") {
body.insert("response_format".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /audio/transcriptions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateTranslationActor,
inports::<100>(file, model, prompt, response_format, temperature),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_translation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/audio/translations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file") {
body.insert("file".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("prompt") {
body.insert("prompt".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("response_format") {
body.insert("response_format".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("temperature") {
body.insert("temperature".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /audio/translations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateSpeechActor,
inports::<100>(model, input, voice, response_format, speed),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_speech(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/audio/speech".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("input") {
body.insert("input".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("voice") {
body.insert("voice".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("response_format") {
body.insert("response_format".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("speed") {
body.insert("speed".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /audio/speech failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateEmbeddingActor,
inports::<100>(input, model, encoding_format, dimensions),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_embedding(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/embeddings".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("input") {
body.insert("input".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("encoding_format") {
body.insert("encoding_format".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("dimensions") {
body.insert("dimensions".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /embeddings failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateModerationActor,
inports::<100>(input, model),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_moderation(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/moderations".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("input") {
body.insert("input".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /moderations failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListModelsActor,
inports::<100>(trigger),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_models(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/models".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /models failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadModelActor,
inports::<100>(model),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_model(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/models/{model}".to_string();
if let Some(val) = inputs.get("model") {
endpoint = endpoint.replace("{{model}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /models/{{model}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteModelActor,
inports::<100>(model),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_model(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/models/{model}".to_string();
if let Some(val) = inputs.get("model") {
endpoint = endpoint.replace("{{model}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /models/{{model}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiUploadFileActor,
inports::<100>(file, purpose),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_upload_file(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/files".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file") {
body.insert("file".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("purpose") {
body.insert("purpose".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /files failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListFilesActor,
inports::<100>(purpose, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_files(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/files".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("purpose") {
query_pairs.push(("purpose", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /files failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadFileActor,
inports::<100>(file_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_file(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/files/{file_id}".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /files/{{file_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteFileActor,
inports::<100>(file_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_file(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/files/{file_id}".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /files/{{file_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiDownloadFileActor,
inports::<100>(file_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_download_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/files/{file_id}/content".to_string();
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /files/{{file_id}}/content failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateFineTuningJobActor,
inports::<100>(training_file, model, validation_file, hyperparameters, suffix, method),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_fine_tuning_job(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/fine_tuning/jobs".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("training_file") {
body.insert("training_file".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("model") {
body.insert("model".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("validation_file") {
body.insert("validation_file".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("hyperparameters") {
body.insert("hyperparameters".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("suffix") {
body.insert("suffix".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("method") {
body.insert("method".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /fine_tuning/jobs failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListFineTuningJobsActor,
inports::<100>(limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_fine_tuning_jobs(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/fine_tuning/jobs".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /fine_tuning/jobs failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadFineTuningJobActor,
inports::<100>(fine_tuning_job_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_fine_tuning_job(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/fine_tuning/jobs/{fine_tuning_job_id}".to_string();
if let Some(val) = inputs.get("fine_tuning_job_id") {
endpoint = endpoint.replace("{{fine_tuning_job_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /fine_tuning/jobs/{{fine_tuning_job_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiStopFineTuningJobActor,
inports::<100>(fine_tuning_job_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_stop_fine_tuning_job(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/fine_tuning/jobs/{fine_tuning_job_id}/cancel".to_string();
if let Some(val) = inputs.get("fine_tuning_job_id") {
endpoint = endpoint.replace("{{fine_tuning_job_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /fine_tuning/jobs/{{fine_tuning_job_id}}/cancel failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiListFineTuningJobEventsActor,
inports::<100>(fine_tuning_job_id, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_fine_tuning_job_events(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/fine_tuning/jobs/{fine_tuning_job_id}/events".to_string();
if let Some(val) = inputs.get("fine_tuning_job_id") {
endpoint = endpoint.replace("{{fine_tuning_job_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /fine_tuning/jobs/{{fine_tuning_job_id}}/events failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiListFineTuningJobCheckpointsActor,
inports::<100>(fine_tuning_job_id, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_fine_tuning_job_checkpoints(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/fine_tuning/jobs/{fine_tuning_job_id}/checkpoints".to_string();
if let Some(val) = inputs.get("fine_tuning_job_id") {
endpoint = endpoint.replace("{{fine_tuning_job_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /fine_tuning/jobs/{{fine_tuning_job_id}}/checkpoints failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateBatchActor,
inports::<100>(input_file_id, endpoint, completion_window, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_batch(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/batches".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("input_file_id") {
body.insert("input_file_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("endpoint") {
body.insert("endpoint".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("completion_window") {
body.insert("completion_window".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /batches failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadBatchActor,
inports::<100>(batch_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_batch(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/batches/{batch_id}".to_string();
if let Some(val) = inputs.get("batch_id") {
endpoint = endpoint.replace("{{batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /batches/{{batch_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiStopBatchActor,
inports::<100>(batch_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_stop_batch(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/batches/{batch_id}/cancel".to_string();
if let Some(val) = inputs.get("batch_id") {
endpoint = endpoint.replace("{{batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /batches/{{batch_id}}/cancel failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListBatchesActor,
inports::<100>(limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_batches(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/batches".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /batches failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateVectorStoreActor,
inports::<100>(name, file_ids, expires_after, chunking_strategy, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_vector_store(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/vector_stores".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("file_ids") {
body.insert("file_ids".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("expires_after") {
body.insert("expires_after".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("chunking_strategy") {
body.insert("chunking_strategy".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /vector_stores failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListVectorStoresActor,
inports::<100>(limit, order, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_vector_stores(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/vector_stores".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("order") {
query_pairs.push(("order", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /vector_stores failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadVectorStoreActor,
inports::<100>(vector_store_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_vector_store(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /vector_stores/{{vector_store_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiUpdateVectorStoreActor,
inports::<100>(vector_store_id, name, expires_after, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_update_vector_store(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("expires_after") {
body.insert("expires_after".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /vector_stores/{{vector_store_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteVectorStoreActor,
inports::<100>(vector_store_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_vector_store(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /vector_stores/{{vector_store_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateVectorStoreFileActor,
inports::<100>(vector_store_id, file_id, chunking_strategy),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_vector_store_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/files".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file_id") {
body.insert("file_id".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("chunking_strategy") {
body.insert("chunking_strategy".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /vector_stores/{{vector_store_id}}/files failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiListVectorStoreFilesActor,
inports::<100>(vector_store_id, limit, filter),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_vector_store_files(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/files".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("filter") {
query_pairs.push(("filter", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /vector_stores/{{vector_store_id}}/files failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadVectorStoreFileActor,
inports::<100>(vector_store_id, file_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_vector_store_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/files/{file_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /vector_stores/{{vector_store_id}}/files/{{file_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteVectorStoreFileActor,
inports::<100>(vector_store_id, file_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_vector_store_file(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/files/{file_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("file_id") {
endpoint = endpoint.replace("{{file_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"DELETE /vector_stores/{{vector_store_id}}/files/{{file_id}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateVectorStoreFileBatchActor,
inports::<100>(vector_store_id, file_ids, chunking_strategy),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_vector_store_file_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/file_batches".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("file_ids") {
body.insert("file_ids".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("chunking_strategy") {
body.insert("chunking_strategy".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /vector_stores/{{vector_store_id}}/file_batches failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadVectorStoreFileBatchActor,
inports::<100>(vector_store_id, batch_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_vector_store_file_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/vector_stores/{vector_store_id}/file_batches/{batch_id}".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("batch_id") {
endpoint = endpoint.replace("{{batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /vector_stores/{{vector_store_id}}/file_batches/{{batch_id}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
OpenaiStopVectorStoreFileBatchActor,
inports::<100>(vector_store_id, batch_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_stop_vector_store_file_batch(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint =
"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel".to_string();
if let Some(val) = inputs.get("vector_store_id") {
endpoint = endpoint.replace("{{vector_store_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("batch_id") {
endpoint = endpoint.replace("{{batch_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("POST /vector_stores/{{vector_store_id}}/file_batches/{{batch_id}}/cancel failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
OpenaiCreateEvalActor,
inports::<100>(name, data_source_config, testing_criteria, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_eval(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/evals".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("data_source_config") {
body.insert("data_source_config".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("testing_criteria") {
body.insert("testing_criteria".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /evals failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListEvalsActor,
inports::<100>(limit, after, order),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_evals(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/evals".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if let Some(val) = inputs.get("order") {
query_pairs.push(("order", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /evals failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadEvalActor,
inports::<100>(eval_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_eval(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /evals/{{eval_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiUpdateEvalActor,
inports::<100>(eval_id, name, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_update_eval(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /evals/{{eval_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteEvalActor,
inports::<100>(eval_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_eval(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("DELETE /evals/{{eval_id}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateEvalRunActor,
inports::<100>(eval_id, name, data_source, metadata),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_eval_run(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("name") {
body.insert("name".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("data_source") {
body.insert("data_source".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("metadata") {
body.insert("metadata".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /evals/{{eval_id}}/runs failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiListEvalRunsActor,
inports::<100>(eval_id, limit, after),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_eval_runs(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("after") {
query_pairs.push(("after", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /evals/{{eval_id}}/runs failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadEvalRunActor,
inports::<100>(eval_id, run_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_eval_run(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs/{run_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("run_id") {
endpoint = endpoint.replace("{{run_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /evals/{{eval_id}}/runs/{{run_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiDeleteEvalRunActor,
inports::<100>(eval_id, run_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_delete_eval_run(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs/{run_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("run_id") {
endpoint = endpoint.replace("{{run_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.delete(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("DELETE /evals/{{eval_id}}/runs/{{run_id}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiStopEvalRunActor,
inports::<100>(eval_id, run_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_stop_eval_run(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs/{run_id}/cancel".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("run_id") {
endpoint = endpoint.replace("{{run_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"POST /evals/{{eval_id}}/runs/{{run_id}}/cancel failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiListEvalRunOutputItemsActor,
inports::<100>(eval_id, run_id, limit),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_list_eval_run_output_items(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs/{run_id}/output_items".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("run_id") {
endpoint = endpoint.replace("{{run_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /evals/{{eval_id}}/runs/{{run_id}}/output_items failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiReadEvalRunOutputItemActor,
inports::<100>(eval_id, run_id, output_item_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_read_eval_run_output_item(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/evals/{eval_id}/runs/{run_id}/output_items/{output_item_id}".to_string();
if let Some(val) = inputs.get("eval_id") {
endpoint = endpoint.replace("{{eval_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("run_id") {
endpoint = endpoint.replace("{{run_id}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("output_item_id") {
endpoint = endpoint.replace("{{output_item_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /evals/{{eval_id}}/runs/{{run_id}}/output_items/{{output_item_id}} failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
OpenaiCreateUploadActor,
inports::<100>(filename, purpose, bytes, mime_type),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_upload(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/uploads".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("filename") {
body.insert("filename".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("purpose") {
body.insert("purpose".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("bytes") {
body.insert("bytes".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("mime_type") {
body.insert("mime_type".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /uploads failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateUploadPartActor,
inports::<100>(upload_id, data),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_upload_part(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/uploads/{upload_id}/parts".to_string();
if let Some(val) = inputs.get("upload_id") {
endpoint = endpoint.replace("{{upload_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("data") {
body.insert("data".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /uploads/{{upload_id}}/parts failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenaiCreateUploadCompletionActor,
inports::<100>(upload_id, part_ids, md5),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_create_upload_completion(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/uploads/{upload_id}/complete".to_string();
if let Some(val) = inputs.get("upload_id") {
endpoint = endpoint.replace("{{upload_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut body = serde_json::Map::new();
if let Some(val) = inputs.get("part_ids") {
body.insert("part_ids".to_string(), val.clone().into());
}
if let Some(val) = inputs.get("md5") {
body.insert("md5".to_string(), val.clone().into());
}
if !body.is_empty() {
builder = builder.json(&serde_json::Value::Object(body));
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("POST /uploads/{{upload_id}}/complete failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenaiStopUploadActor,
inports::<100>(upload_id),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openai_stop_upload(context: ActorContext) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/uploads/{upload_id}/cancel".to_string();
if let Some(val) = inputs.get("upload_id") {
endpoint = endpoint.replace("{{upload_id}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.post(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("POST /uploads/{{upload_id}}/cancel failed: {}", e).into()),
);
}
}
Ok(output)
}