composio_sdk/meta_tools/connections.rs
1//! Connection Manager Implementation
2//!
3//! Native Rust implementation of COMPOSIO_MANAGE_CONNECTIONS meta tool.
4//! Handles OAuth and API key authentication management.
5
6use crate::client::ComposioClient;
7use crate::error::ComposioError;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11/// Connection status
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
14pub enum ConnectionStatus {
15 Active,
16 Initiated,
17 Expired,
18 Failed,
19 Inactive,
20}
21
22/// Connected account information
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConnectedAccount {
25 /// Account ID
26 pub id: String,
27
28 /// Toolkit slug
29 pub toolkit: String,
30
31 /// Connection status
32 pub status: ConnectionStatus,
33
34 /// User ID
35 pub user_id: String,
36
37 /// Created timestamp
38 pub created_at: String,
39
40 /// Updated timestamp
41 pub updated_at: String,
42}
43
44/// Authorization link response
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct AuthLink {
47 /// Redirect URL for OAuth flow
48 pub redirect_url: String,
49
50 /// Link token
51 pub link_token: String,
52
53 /// Expiration timestamp
54 pub expires_at: String,
55
56 /// Connected account ID (if already exists)
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub connected_account_id: Option<String>,
59}
60
61/// Connection manager
62pub struct ConnectionManager {
63 client: Arc<ComposioClient>,
64}
65
66impl ConnectionManager {
67 /// Create a new connection manager instance
68 ///
69 /// # Arguments
70 ///
71 /// * `client` - Composio client instance
72 ///
73 /// # Example
74 ///
75 /// ```no_run
76 /// use composio_sdk::{ComposioClient, meta_tools::ConnectionManager};
77 /// use std::sync::Arc;
78 ///
79 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
80 /// let client = ComposioClient::builder()
81 /// .api_key("your-api-key")
82 /// .build()?;
83 ///
84 /// let manager = ConnectionManager::new(Arc::new(client));
85 /// # Ok(())
86 /// # }
87 /// ```
88 pub fn new(client: Arc<ComposioClient>) -> Self {
89 Self { client }
90 }
91
92 /// List all connected accounts for a session
93 ///
94 /// # Arguments
95 ///
96 /// * `session_id` - Session ID
97 ///
98 /// # Returns
99 ///
100 /// Vector of connected accounts
101 ///
102 /// # Example
103 ///
104 /// ```no_run
105 /// # use composio_sdk::{ComposioClient, meta_tools::ConnectionManager};
106 /// # use std::sync::Arc;
107 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
108 /// # let client = Arc::new(ComposioClient::builder().api_key("key").build()?);
109 /// let manager = ConnectionManager::new(client);
110 /// let accounts = manager.list_connections("session_123").await?;
111 ///
112 /// for account in accounts {
113 /// println!("{}: {:?}", account.toolkit, account.status);
114 /// }
115 /// # Ok(())
116 /// # }
117 /// ```
118 pub async fn list_connections(
119 &self,
120 session_id: &str,
121 ) -> Result<Vec<ConnectedAccount>, ComposioError> {
122 let url = format!(
123 "{}/tool_router/session/{}/toolkits",
124 self.client.config().base_url,
125 session_id
126 );
127
128 let response = self
129 .client
130 .http_client()
131 .get(&url)
132 .send()
133 .await?;
134
135 if !response.status().is_success() {
136 return Err(ComposioError::from_response(response).await);
137 }
138
139 let data: serde_json::Value = response.json().await?;
140
141 let accounts = data["data"]["items"]
142 .as_array()
143 .ok_or_else(|| {
144 ComposioError::InvalidInput("Invalid response format".to_string())
145 })?
146 .iter()
147 .filter_map(|item| {
148 item["connected_account"]
149 .as_object()
150 .and_then(|acc| serde_json::from_value(serde_json::Value::Object(acc.clone())).ok())
151 })
152 .collect();
153
154 Ok(accounts)
155 }
156
157 /// Create authorization link for a toolkit
158 ///
159 /// # Arguments
160 ///
161 /// * `session_id` - Session ID
162 /// * `toolkit` - Toolkit slug (e.g., "github", "gmail")
163 /// * `callback_url` - Optional callback URL after OAuth
164 ///
165 /// # Returns
166 ///
167 /// Authorization link with redirect URL
168 ///
169 /// # Example
170 ///
171 /// ```no_run
172 /// # use composio_sdk::{ComposioClient, meta_tools::ConnectionManager};
173 /// # use std::sync::Arc;
174 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175 /// # let client = Arc::new(ComposioClient::builder().api_key("key").build()?);
176 /// let manager = ConnectionManager::new(client);
177 /// let link = manager.create_auth_link(
178 /// "session_123",
179 /// "github",
180 /// Some("https://myapp.com/callback"),
181 /// ).await?;
182 ///
183 /// println!("Redirect user to: {}", link.redirect_url);
184 /// # Ok(())
185 /// # }
186 /// ```
187 pub async fn create_auth_link(
188 &self,
189 session_id: &str,
190 toolkit: &str,
191 callback_url: Option<&str>,
192 ) -> Result<AuthLink, ComposioError> {
193 let url = format!(
194 "{}/tool_router/session/{}/link",
195 self.client.config().base_url,
196 session_id
197 );
198
199 let mut body = serde_json::json!({
200 "toolkit": toolkit,
201 });
202
203 if let Some(callback) = callback_url {
204 body["callback_url"] = serde_json::json!(callback);
205 }
206
207 let response = self
208 .client
209 .http_client()
210 .post(&url)
211 .json(&body)
212 .send()
213 .await?;
214
215 if !response.status().is_success() {
216 return Err(ComposioError::from_response(response).await);
217 }
218
219 let data: serde_json::Value = response.json().await?;
220
221 let link: AuthLink = serde_json::from_value(data["data"].clone())?;
222
223 Ok(link)
224 }
225
226 /// Check if a toolkit is connected
227 ///
228 /// # Arguments
229 ///
230 /// * `session_id` - Session ID
231 /// * `toolkit` - Toolkit slug
232 ///
233 /// # Returns
234 ///
235 /// True if toolkit is connected and active
236 ///
237 /// # Example
238 ///
239 /// ```no_run
240 /// # use composio_sdk::{ComposioClient, meta_tools::ConnectionManager};
241 /// # use std::sync::Arc;
242 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
243 /// # let client = Arc::new(ComposioClient::builder().api_key("key").build()?);
244 /// let manager = ConnectionManager::new(client);
245 /// let is_connected = manager.is_connected("session_123", "github").await?;
246 ///
247 /// if !is_connected {
248 /// println!("GitHub is not connected. Please authenticate.");
249 /// }
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub async fn is_connected(&self, session_id: &str, toolkit: &str) -> Result<bool, ComposioError> {
254 let accounts = self.list_connections(session_id).await?;
255
256 Ok(accounts
257 .iter()
258 .any(|acc| acc.toolkit == toolkit && acc.status == ConnectionStatus::Active))
259 }
260
261 /// Get connection status for a specific toolkit
262 ///
263 /// # Arguments
264 ///
265 /// * `session_id` - Session ID
266 /// * `toolkit` - Toolkit slug
267 ///
268 /// # Returns
269 ///
270 /// Connection status or None if not found
271 ///
272 /// # Example
273 ///
274 /// ```no_run
275 /// # use composio_sdk::{ComposioClient, meta_tools::ConnectionManager};
276 /// # use std::sync::Arc;
277 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
278 /// # let client = Arc::new(ComposioClient::builder().api_key("key").build()?);
279 /// let manager = ConnectionManager::new(client);
280 /// let status = manager.get_connection_status("session_123", "github").await?;
281 ///
282 /// match status {
283 /// Some(status) => println!("GitHub status: {:?}", status),
284 /// None => println!("GitHub not connected"),
285 /// }
286 /// # Ok(())
287 /// # }
288 /// ```
289 pub async fn get_connection_status(
290 &self,
291 session_id: &str,
292 toolkit: &str,
293 ) -> Result<Option<ConnectionStatus>, ComposioError> {
294 let accounts = self.list_connections(session_id).await?;
295
296 Ok(accounts
297 .iter()
298 .find(|acc| acc.toolkit == toolkit)
299 .map(|acc| acc.status.clone()))
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_connection_status_serialization() {
309 let status = ConnectionStatus::Active;
310 let json = serde_json::to_string(&status).unwrap();
311 assert_eq!(json, "\"ACTIVE\"");
312
313 let deserialized: ConnectionStatus = serde_json::from_str(&json).unwrap();
314 assert_eq!(deserialized, ConnectionStatus::Active);
315 }
316
317 #[test]
318 fn test_connected_account_serialization() {
319 let account = ConnectedAccount {
320 id: "ca_123".to_string(),
321 toolkit: "github".to_string(),
322 status: ConnectionStatus::Active,
323 user_id: "user_123".to_string(),
324 created_at: "2024-01-01T00:00:00Z".to_string(),
325 updated_at: "2024-01-01T00:00:00Z".to_string(),
326 };
327
328 let json = serde_json::to_string(&account).unwrap();
329 assert!(json.contains("ca_123"));
330 assert!(json.contains("github"));
331 assert!(json.contains("ACTIVE"));
332
333 let deserialized: ConnectedAccount = serde_json::from_str(&json).unwrap();
334 assert_eq!(deserialized.id, "ca_123");
335 assert_eq!(deserialized.status, ConnectionStatus::Active);
336 }
337
338 #[test]
339 fn test_auth_link_serialization() {
340 let link = AuthLink {
341 redirect_url: "https://auth.composio.dev/...".to_string(),
342 link_token: "lt_abc123".to_string(),
343 expires_at: "2024-01-01T01:00:00Z".to_string(),
344 connected_account_id: Some("ca_123".to_string()),
345 };
346
347 let json = serde_json::to_string(&link).unwrap();
348 assert!(json.contains("redirect_url"));
349 assert!(json.contains("lt_abc123"));
350 assert!(json.contains("ca_123"));
351 }
352
353 #[test]
354 fn test_auth_link_without_account_id() {
355 let link = AuthLink {
356 redirect_url: "https://auth.composio.dev/...".to_string(),
357 link_token: "lt_abc123".to_string(),
358 expires_at: "2024-01-01T01:00:00Z".to_string(),
359 connected_account_id: None,
360 };
361
362 let json = serde_json::to_string(&link).unwrap();
363 assert!(!json.contains("connected_account_id"));
364 }
365}