use composio_sdk::{ComposioClient, ComposioError, MetaToolSlug};
use serde_json::json;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
async fn create_mock_session(mock_server: &MockServer) {
Mock::given(method("POST"))
.and(path("/tool_router/session"))
.and(header("x-api-key", "test_key"))
.respond_with(ResponseTemplate::new(201).set_body_json(json!({
"session_id": "sess_retry",
"mcp": {"url": "https://mcp.composio.dev"},
"tool_router_tools": [],
"config": {
"user_id": "user_retry"
}
})))
.mount(mock_server)
.await;
}
#[tokio::test]
async fn test_retry_on_429_rate_limit() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 2 {
ResponseTemplate::new(429).set_body_json(json!({
"message": "Rate limit exceeded",
"status": 429,
"code": "RATE_LIMIT_EXCEEDED",
"request_id": format!("req_429_{}", count),
"suggested_fix": "Wait before retrying"
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
assert_eq!(result.log_id, "log_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_on_500_internal_server_error() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(500).set_body_json(json!({
"message": "Internal server error",
"status": 500,
"code": "INTERNAL_SERVER_ERROR"
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_500_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
assert_eq!(result.log_id, "log_500_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_retry_on_502_bad_gateway() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(502).set_body_json(json!({
"message": "Bad gateway",
"status": 502
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_502_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
assert_eq!(result.log_id, "log_502_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_retry_on_503_service_unavailable() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(503).set_body_json(json!({
"message": "Service temporarily unavailable",
"status": 503
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_503_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
assert_eq!(result.log_id, "log_503_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_retry_on_504_gateway_timeout() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(504).set_body_json(json!({
"message": "Gateway timeout",
"status": 504
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_504_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
assert_eq!(result.log_id, "log_504_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_retry_exhaustion_on_persistent_500() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(500).set_body_json(json!({
"message": "Persistent server error",
"status": 500,
"code": "PERSISTENT_ERROR",
"request_id": "req_persistent"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 500);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 4);
}
#[tokio::test]
async fn test_meta_tool_retry_on_503() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute_meta"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 1 {
ResponseTemplate::new(503).set_body_json(json!({
"message": "Service temporarily unavailable",
"status": 503
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"tools": []},
"error": null,
"log_id": "log_meta_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_meta_tool(
MetaToolSlug::ComposioSearchTools,
json!({"query": "test"}),
)
.await
.unwrap();
assert_eq!(result.log_id, "log_meta_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_retry_with_exponential_backoff() {
use std::time::Instant;
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
let count = counter_clone.fetch_add(1, Ordering::SeqCst);
if count < 2 {
ResponseTemplate::new(503).set_body_json(json!({
"message": "Service unavailable",
"status": 503
}))
} else {
ResponseTemplate::new(200).set_body_json(json!({
"data": {"success": true},
"error": null,
"log_id": "log_backoff_success"
}))
}
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(50))
.max_retry_delay(std::time::Duration::from_millis(500))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let start = Instant::now();
let result = session
.execute_tool("GITHUB_GET_REPOS", json!({}))
.await
.unwrap();
let elapsed = start.elapsed();
assert_eq!(result.log_id, "log_backoff_success");
assert_eq!(attempt_counter.load(Ordering::SeqCst), 3);
assert!(
elapsed.as_millis() >= 100,
"Expected exponential backoff delays, got {:?}",
elapsed
);
}
#[tokio::test]
async fn test_is_retryable_method() {
for status in [429, 500, 502, 503, 504] {
let error = ComposioError::ApiError {
status,
message: "Test error".to_string(),
code: None,
slug: None,
request_id: None,
suggested_fix: None,
errors: None,
};
assert!(
error.is_retryable(),
"Status {} should be retryable",
status
);
}
for status in [400, 401, 403, 404] {
let error = ComposioError::ApiError {
status,
message: "Test error".to_string(),
code: None,
slug: None,
request_id: None,
suggested_fix: None,
errors: None,
};
assert!(
!error.is_retryable(),
"Status {} should not be retryable",
status
);
}
let invalid_input = ComposioError::InvalidInput("Test".to_string());
assert!(!invalid_input.is_retryable());
let config_error = ComposioError::ConfigError("Test".to_string());
assert!(!config_error.is_retryable());
}
#[tokio::test]
async fn test_no_retry_on_400_bad_request() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(400).set_body_json(json!({
"message": "Invalid arguments",
"status": 400,
"code": "INVALID_ARGUMENTS"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_CREATE_ISSUE", json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 400);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_no_retry_on_401_unauthorized() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(401).set_body_json(json!({
"message": "No connected account",
"status": 401,
"code": "NO_CONNECTED_ACCOUNT"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_CREATE_ISSUE", json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 401);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_no_retry_on_403_forbidden() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(403).set_body_json(json!({
"message": "Insufficient permissions",
"status": 403,
"code": "INSUFFICIENT_PERMISSIONS"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("GITHUB_DELETE_REPO", json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 403);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_no_retry_on_404_not_found() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(404).set_body_json(json!({
"message": "Tool not found",
"status": 404,
"code": "TOOL_NOT_FOUND"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_tool("INVALID_TOOL", json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 404);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_meta_tool_no_retry_on_400() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute_meta"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(400).set_body_json(json!({
"message": "Missing required field",
"status": 400,
"code": "MISSING_REQUIRED_FIELD"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_meta_tool(MetaToolSlug::ComposioSearchTools, json!({}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 400);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_meta_tool_no_retry_on_404() {
let mock_server = MockServer::start().await;
create_mock_session(&mock_server).await;
let attempt_counter = Arc::new(AtomicU32::new(0));
let counter_clone = attempt_counter.clone();
Mock::given(method("POST"))
.and(path("/tool_router/session/sess_retry/execute_meta"))
.respond_with(move |_req: &wiremock::Request| {
counter_clone.fetch_add(1, Ordering::SeqCst);
ResponseTemplate::new(404).set_body_json(json!({
"message": "Session not found",
"status": 404,
"code": "SESSION_NOT_FOUND"
}))
})
.mount(&mock_server)
.await;
let client = ComposioClient::builder()
.api_key("test_key")
.base_url(mock_server.uri())
.max_retries(3)
.initial_retry_delay(std::time::Duration::from_millis(10))
.build()
.unwrap();
let session = client.create_session("user_retry").send().await.unwrap();
let result = session
.execute_meta_tool(
MetaToolSlug::ComposioSearchTools,
json!({"query": "test"}),
)
.await;
assert!(result.is_err());
match result.unwrap_err() {
ComposioError::ApiError { status, .. } => {
assert_eq!(status, 404);
}
_ => panic!("Expected ApiError"),
}
assert_eq!(attempt_counter.load(Ordering::SeqCst), 1);
}