#[cfg(feature = "composio")]
use std::sync::Arc;
#[cfg(feature = "composio")]
use async_trait::async_trait;
#[cfg(feature = "composio")]
use crate::integrations::composio::transport::SseTransport;
#[cfg(feature = "composio")]
use crate::integrations::composio::ComposioError;
#[cfg(feature = "composio")]
use crate::reasoning::circuit_breaker::CircuitBreakerRegistry;
#[cfg(feature = "composio")]
use crate::reasoning::executor::ActionExecutor;
#[cfg(feature = "composio")]
use crate::reasoning::inference::ToolDefinition;
#[cfg(feature = "composio")]
use crate::reasoning::loop_types::{LoopConfig, Observation, ProposedAction};
#[cfg(feature = "composio")]
pub struct ComposioToolExecutor {
transport: Arc<SseTransport>,
tool_definitions: Vec<ToolDefinition>,
}
#[cfg(feature = "composio")]
impl ComposioToolExecutor {
pub async fn discover(transport: Arc<SseTransport>) -> Result<Self, ComposioError> {
let result = transport
.request("tools/list", serde_json::json!({}))
.await?;
let tools_value = result.get("tools").cloned().unwrap_or(result.clone());
let raw_tools: Vec<serde_json::Value> =
serde_json::from_value(tools_value).map_err(|e| ComposioError::TransportError {
reason: format!("failed to parse tools/list response: {}", e),
})?;
let tool_definitions = raw_tools
.into_iter()
.map(|t| {
let name = t["name"].as_str().unwrap_or("unknown").to_string();
let description = t["description"].as_str().unwrap_or("").to_string();
let parameters = t
.get("inputSchema")
.or_else(|| t.get("parameters"))
.cloned()
.unwrap_or(serde_json::json!({
"type": "object",
"properties": {},
"required": []
}));
ToolDefinition {
name,
description,
parameters,
}
})
.collect();
Ok(Self {
transport,
tool_definitions,
})
}
pub fn tool_definitions(&self) -> &[ToolDefinition] {
&self.tool_definitions
}
async fn call_tool(&self, name: &str, arguments: &str) -> Result<String, String> {
let args: serde_json::Value =
serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
let params = serde_json::json!({
"name": name,
"arguments": args,
});
let result = self
.transport
.request("tools/call", params)
.await
.map_err(|e| e.to_string())?;
if let Some(content) = result.get("content") {
if let Some(arr) = content.as_array() {
let texts: Vec<&str> = arr
.iter()
.filter_map(|c| c.get("text").and_then(|t| t.as_str()))
.collect();
if !texts.is_empty() {
return Ok(texts.join("\n"));
}
}
}
Ok(serde_json::to_string_pretty(&result).unwrap_or_default())
}
}
#[cfg(feature = "composio")]
#[async_trait]
impl ActionExecutor for ComposioToolExecutor {
async fn execute_actions(
&self,
actions: &[ProposedAction],
_config: &LoopConfig,
_circuit_breakers: &CircuitBreakerRegistry,
) -> Vec<Observation> {
let mut observations = Vec::new();
for action in actions {
if let ProposedAction::ToolCall {
call_id,
name,
arguments,
} = action
{
match self.call_tool(name, arguments).await {
Ok(result) => {
observations.push(
Observation::tool_result(name.clone(), result)
.with_call_id(call_id.clone()),
);
}
Err(err) => {
observations.push(
Observation::tool_error(name.clone(), err)
.with_call_id(call_id.clone()),
);
}
}
}
}
observations
}
fn tool_definitions(&self) -> Vec<ToolDefinition> {
self.tool_definitions.clone()
}
}
#[cfg(test)]
#[cfg(feature = "composio")]
mod tests {
use super::*;
#[test]
fn test_tool_definition_parsing() {
let raw = serde_json::json!([
{
"name": "TWITTER_CREATE_TWEET",
"description": "Post a tweet to Twitter/X",
"inputSchema": {
"type": "object",
"properties": {
"text": { "type": "string", "description": "Tweet text" }
},
"required": ["text"]
}
}
]);
let tools: Vec<serde_json::Value> = serde_json::from_value(raw).unwrap();
let defs: Vec<ToolDefinition> = tools
.into_iter()
.map(|t| {
let name = t["name"].as_str().unwrap_or("unknown").to_string();
let description = t["description"].as_str().unwrap_or("").to_string();
let parameters = t
.get("inputSchema")
.or_else(|| t.get("parameters"))
.cloned()
.unwrap_or(serde_json::json!({}));
ToolDefinition {
name,
description,
parameters,
}
})
.collect();
assert_eq!(defs.len(), 1);
assert_eq!(defs[0].name, "TWITTER_CREATE_TWEET");
assert!(defs[0].parameters["properties"]["text"].is_object());
}
#[test]
fn test_mcp_content_extraction() {
let result = serde_json::json!({
"content": [
{ "type": "text", "text": "Tweet posted successfully" }
]
});
if let Some(content) = result.get("content") {
if let Some(arr) = content.as_array() {
let texts: Vec<&str> = arr
.iter()
.filter_map(|c| c.get("text").and_then(|t| t.as_str()))
.collect();
assert_eq!(texts, vec!["Tweet posted successfully"]);
}
}
}
}