use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
const DEFAULT_CHUNK_SIZE: usize = 1024 * 1024;
const MAX_FILENAME_LENGTH: usize = 100;
const MAX_RESPONSE_SIZE: usize = 100 * 1024 * 1024;
const CONNECT_TIMEOUT_SECS: u64 = 5;
const READ_TIMEOUT_SECS: u64 = 60;
const LOCAL_CACHE_DIRECTORY_NAME: &str = ".composio";
const ENV_LOCAL_CACHE_DIRECTORY: &str = "COMPOSIO_CACHE_DIR";
const LOCAL_OUTPUT_FILE_DIRECTORY: &str = "outputs";
pub fn get_local_cache_directory() -> PathBuf {
if let Ok(cache_dir) = std::env::var(ENV_LOCAL_CACHE_DIRECTORY) {
PathBuf::from(cache_dir)
} else {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(LOCAL_CACHE_DIRECTORY_NAME)
}
}
pub fn get_local_output_directory() -> PathBuf {
get_local_cache_directory().join(LOCAL_OUTPUT_FILE_DIRECTORY)
}
pub fn calculate_md5(file_path: &Path) -> Result<String, std::io::Error> {
use std::fs::File;
use std::io::Read;
let mut file = File::open(file_path)?;
let mut hasher = md5::Context::new();
let mut buffer = vec![0; DEFAULT_CHUNK_SIZE];
loop {
let bytes_read = file.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
hasher.consume(&buffer[..bytes_read]);
}
Ok(format!("{:x}", hasher.compute()))
}
pub fn is_url(value: &str) -> bool {
value.starts_with("http://") || value.starts_with("https://")
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FileListParams {
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_slug: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub toolkit_slug: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileListItem {
pub filename: String,
pub md5: String,
pub mimetype: String,
pub tool_slug: String,
pub toolkit_slug: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileListResponse {
pub current_page: u32,
pub total_items: u32,
pub total_pages: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<String>,
pub items: Vec<FileListItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileCreatePresignedUrlParams {
pub filename: String,
pub md5: String,
pub mimetype: String,
pub tool_slug: String,
pub toolkit_slug: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum FileStorageBackend {
S3,
AzureBlobStorage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileCreatePresignedUrlMetadata {
pub storage_backend: FileStorageBackend,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileUploadResponse {
pub id: String,
pub key: String,
#[serde(rename = "type")]
pub file_type: String,
#[serde(alias = "newPresignedUrl")]
pub new_presigned_url: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<FileCreatePresignedUrlMetadata>,
}
pub type FileCreatePresignedUrlResponse = FileUploadResponse;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileUploadable {
pub name: String,
pub mimetype: String,
pub s3key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileDownloadable {
pub name: String,
pub mimetype: String,
pub s3url: String,
}
impl FileUploadable {
pub async fn from_path(
client: &crate::client::ComposioClient,
file: &Path,
tool: &str,
toolkit: &str,
) -> Result<Self, crate::error::ComposioError> {
if let Some(file_str) = file.to_str() {
if is_url(file_str) {
return Self::from_url(client, file_str, tool, toolkit).await;
}
}
if !file.exists() {
return Err(crate::error::ComposioError::FileNotFound(
file.display().to_string(),
));
}
if !file.is_file() {
return Err(crate::error::ComposioError::InvalidFile(format!(
"Not a file: {}",
file.display()
)));
}
let filename = file.file_name().and_then(|n| n.to_str()).ok_or_else(|| {
crate::error::ComposioError::InvalidFile("Invalid filename".to_string())
})?;
let mimetype = crate::utils::mimetypes::guess_mime_type(file);
let md5_hash = calculate_md5(file)?;
let upload_response =
Self::request_upload_url(client, &md5_hash, filename, &mimetype, tool, toolkit).await?;
Self::upload_to_s3(&upload_response.new_presigned_url, file).await?;
Ok(Self {
name: filename.to_string(),
mimetype,
s3key: upload_response.key,
})
}
pub async fn from_url(
client: &crate::client::ComposioClient,
url: &str,
tool: &str,
toolkit: &str,
) -> Result<Self, crate::error::ComposioError> {
let (filename, content, mimetype) = Self::fetch_from_url(url).await?;
let s3key =
Self::upload_bytes_to_s3(client, &filename, &content, &mimetype, tool, toolkit).await?;
Ok(Self {
name: filename,
mimetype,
s3key,
})
}
async fn request_upload_url(
client: &crate::client::ComposioClient,
md5: &str,
filename: &str,
mimetype: &str,
tool: &str,
toolkit: &str,
) -> Result<FileUploadResponse, crate::error::ComposioError> {
let params = FileCreatePresignedUrlParams {
filename: filename.to_string(),
md5: md5.to_string(),
mimetype: mimetype.to_string(),
tool_slug: tool.to_string(),
toolkit_slug: toolkit.to_string(),
};
client.create_file_upload_request(params).await
}
async fn upload_to_s3(url: &str, file: &Path) -> Result<(), crate::error::ComposioError> {
let file_content = tokio::fs::read(file).await?;
let response = reqwest::Client::new()
.put(url)
.body(file_content)
.send()
.await?;
if response.status() != 200 {
return Err(crate::error::ComposioError::UploadFailed(format!(
"S3 upload failed with status: {}",
response.status()
)));
}
Ok(())
}
async fn fetch_from_url(
url: &str,
) -> Result<(String, Vec<u8>, String), crate::error::ComposioError> {
use reqwest::redirect::Policy;
let client = reqwest::Client::builder()
.redirect(Policy::none()) .connect_timeout(std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS))
.timeout(std::time::Duration::from_secs(READ_TIMEOUT_SECS))
.build()?;
let response = client.get(url).send().await?;
if response.status().is_redirection() {
return Err(crate::error::ComposioError::UploadFailed(
"URL returned redirect. Please provide a direct URL to the file.".to_string(),
));
}
if !response.status().is_success() {
return Err(crate::error::ComposioError::UploadFailed(format!(
"Failed to fetch file from URL. Status: {}",
response.status()
)));
}
if let Some(content_length) = response.content_length() {
if content_length as usize > MAX_RESPONSE_SIZE {
return Err(crate::error::ComposioError::FileTooLarge(format!(
"File size ({} bytes) exceeds maximum ({} bytes)",
content_length, MAX_RESPONSE_SIZE
)));
}
}
let mimetype = response
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.map(|ct| crate::utils::mimetypes::extract_from_content_type(ct))
.unwrap_or_else(|| crate::utils::mimetypes::DEFAULT_MIME_TYPE.to_string());
let bytes = response.bytes().await?;
if bytes.len() > MAX_RESPONSE_SIZE {
return Err(crate::error::ComposioError::FileTooLarge(format!(
"Response size exceeds maximum ({} bytes)",
MAX_RESPONSE_SIZE
)));
}
let filename = Self::extract_filename_from_url(url, &mimetype);
Ok((filename, bytes.to_vec(), mimetype))
}
fn extract_filename_from_url(url: &str, mimetype: &str) -> String {
use url::Url;
if let Ok(parsed) = Url::parse(url) {
if let Some(segments) = parsed.path_segments() {
if let Some(last) = segments.last() {
let decoded = urlencoding::decode(last).unwrap_or_default();
if !decoded.is_empty() {
return Self::truncate_filename(&decoded);
}
}
}
}
Self::generate_timestamped_filename(mimetype)
}
fn truncate_filename(filename: &str) -> String {
if filename.len() <= MAX_FILENAME_LENGTH {
return filename.to_string();
}
let extension = if let Some(pos) = filename.rfind('.') {
&filename[pos..]
} else {
""
};
Self::generate_timestamped_filename(extension)
}
fn generate_timestamped_filename(extension: &str) -> String {
use chrono::Utc;
use uuid::Uuid;
let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
let unique_id = &Uuid::new_v4().to_string()[..8];
format!("file_{}_{}{}", timestamp, unique_id, extension)
}
async fn upload_bytes_to_s3(
client: &crate::client::ComposioClient,
filename: &str,
content: &[u8],
mimetype: &str,
tool: &str,
toolkit: &str,
) -> Result<String, crate::error::ComposioError> {
let md5_hash = format!("{:x}", md5::compute(content));
let upload_response =
Self::request_upload_url(client, &md5_hash, filename, mimetype, tool, toolkit).await?;
let response = reqwest::Client::new()
.put(&upload_response.new_presigned_url)
.header("Content-Type", mimetype)
.body(content.to_vec())
.send()
.await?;
if response.status() != 200 {
return Err(crate::error::ComposioError::UploadFailed(format!(
"S3 upload failed with status: {}",
response.status()
)));
}
Ok(upload_response.key)
}
}
impl FileDownloadable {
pub async fn download(&self, outdir: &Path) -> Result<PathBuf, crate::error::ComposioError> {
tokio::fs::create_dir_all(outdir).await?;
let outfile = outdir.join(&self.name);
let response = reqwest::get(&self.s3url).await?;
if response.status() != 200 {
return Err(crate::error::ComposioError::DownloadFailed(format!(
"Failed to download file. Status: {}",
response.status()
)));
}
let bytes = response.bytes().await?;
tokio::fs::write(&outfile, bytes).await?;
Ok(outfile)
}
}
pub struct FileHelper {
_outdir: PathBuf,
}
impl FileHelper {
pub fn new(outdir: Option<PathBuf>) -> Self {
let outdir = outdir.unwrap_or_else(get_local_output_directory);
Self { _outdir: outdir }
}
fn has_file_property(&self, schema: &JsonValue, property_name: &str) -> bool {
if let Some(obj) = schema.as_object() {
if obj
.get(property_name)
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
return true;
}
if let Some(any_of) = obj.get("anyOf").and_then(|v| v.as_array()) {
for variant in any_of {
if self.has_file_property(variant, property_name) {
return true;
}
}
}
if let Some(one_of) = obj.get("oneOf").and_then(|v| v.as_array()) {
for variant in one_of {
if self.has_file_property(variant, property_name) {
return true;
}
}
}
if let Some(all_of) = obj.get("allOf").and_then(|v| v.as_array()) {
for variant in all_of {
if self.has_file_property(variant, property_name) {
return true;
}
}
}
if let Some(properties) = obj.get("properties").and_then(|v| v.as_object()) {
for prop in properties.values() {
if self.has_file_property(prop, property_name) {
return true;
}
}
}
if let Some(items) = obj.get("items") {
if self.has_file_property(items, property_name) {
return true;
}
}
}
false
}
pub fn is_file_uploadable(&self, schema: &JsonValue) -> bool {
self.has_file_property(schema, "file_uploadable")
}
pub fn is_file_downloadable(&self, schema: &JsonValue) -> bool {
self.has_file_property(schema, "file_downloadable")
}
pub fn transform_schema_for_file_upload(&self, schema: JsonValue) -> JsonValue {
if let Some(mut obj) = schema.as_object().cloned() {
if obj
.get("file_uploadable")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
return serde_json::json!({
"type": "string",
"format": "path",
"description": obj.get("description")
.and_then(|v| v.as_str())
.unwrap_or("Path to file."),
"title": obj.get("title"),
"file_uploadable": true,
});
}
if let Some(any_of) = obj.get("anyOf").and_then(|v| v.as_array()) {
let transformed: Vec<JsonValue> = any_of
.iter()
.map(|v| self.transform_schema_for_file_upload(v.clone()))
.collect();
obj.insert("anyOf".to_string(), JsonValue::Array(transformed));
}
if let Some(one_of) = obj.get("oneOf").and_then(|v| v.as_array()) {
let transformed: Vec<JsonValue> = one_of
.iter()
.map(|v| self.transform_schema_for_file_upload(v.clone()))
.collect();
obj.insert("oneOf".to_string(), JsonValue::Array(transformed));
}
if let Some(all_of) = obj.get("allOf").and_then(|v| v.as_array()) {
let transformed: Vec<JsonValue> = all_of
.iter()
.map(|v| self.transform_schema_for_file_upload(v.clone()))
.collect();
obj.insert("allOf".to_string(), JsonValue::Array(transformed));
}
if let Some(properties) = obj.get("properties").and_then(|v| v.as_object()) {
let transformed: HashMap<String, JsonValue> = properties
.iter()
.map(|(k, v)| (k.clone(), self.transform_schema_for_file_upload(v.clone())))
.collect();
obj.insert(
"properties".to_string(),
serde_json::to_value(transformed).unwrap(),
);
}
if let Some(items) = obj.get("items") {
let transformed = self.transform_schema_for_file_upload(items.clone());
obj.insert("items".to_string(), transformed);
}
return JsonValue::Object(obj);
}
schema
}
pub fn process_file_uploadable_schema(&self, mut schema: JsonValue) -> JsonValue {
if let Some(obj) = schema.as_object_mut() {
if let Some(properties) = obj.get("properties").cloned() {
let transformed = self.transform_schema_for_file_upload(properties);
obj.insert("properties".to_string(), transformed);
}
}
schema
}
pub fn enhance_schema_descriptions(&self, mut schema: JsonValue) -> JsonValue {
if let Some(obj) = schema.as_object_mut() {
let required = obj
.get("required")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect::<Vec<_>>()
})
.unwrap_or_default();
if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
for (param, prop_schema) in properties.iter_mut() {
if let Some(prop_obj) = prop_schema.as_object_mut() {
if let Some(type_str) = prop_obj.get("type").and_then(|v| v.as_str()) {
if matches!(type_str, "string" | "integer" | "number" | "boolean") {
let desc = prop_obj
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim_end_matches('.');
let ext = format!("Please provide a value of type {}.", type_str);
let new_desc = if desc.is_empty() {
ext
} else {
format!("{}. {}", desc, ext)
};
prop_obj
.insert("description".to_string(), JsonValue::String(new_desc));
}
}
if required.contains(param) {
let desc = prop_obj
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim_end_matches('.');
let new_desc = if desc.is_empty() {
"This parameter is required.".to_string()
} else {
format!("{}. This parameter is required.", desc)
};
prop_obj.insert("description".to_string(), JsonValue::String(new_desc));
}
}
}
}
}
schema
}
pub fn process_schema_recursively(&self, schema: JsonValue) -> JsonValue {
let schema = self.process_file_uploadable_schema(schema);
self.enhance_schema_descriptions(schema)
}
}