ultrafast_mcp_sequential_thinking/thinking/
client.rs

1//! # Client Implementation
2//!
3//! UltraFast MCP client implementation for sequential thinking.
4//!
5//! This module provides the main client implementation that connects to
6//! sequential thinking servers and manages thinking sessions.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use tracing::info;
12
13use ultrafast_mcp::{
14    ClientCapabilities, ClientInfo, ListToolsRequest, Tool, ToolCall, ToolContent, ToolResult,
15    UltraFastClient,
16};
17
18use crate::thinking::error::{SequentialThinkingError, SequentialThinkingResult};
19use crate::thinking::{ThinkingEngine, ThinkingProgress, ThinkingStats, ThoughtData};
20
21#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
22pub struct ClientThinkingConfig {
23    /// Whether to enable progress tracking
24    pub enable_progress_tracking: bool,
25    /// Auto-save interval in seconds
26    pub auto_save_interval: u64,
27    /// Whether to show thought visualization
28    pub show_thought_visualization: bool,
29    /// Maximum retry attempts for failed operations
30    pub max_retry_attempts: u32,
31    /// Timeout for individual operations in seconds
32    pub operation_timeout: u64,
33}
34
35impl Default for ClientThinkingConfig {
36    fn default() -> Self {
37        Self {
38            enable_progress_tracking: true,
39            auto_save_interval: 60,
40            show_thought_visualization: true,
41            max_retry_attempts: 3,
42            operation_timeout: 30,
43        }
44    }
45}
46
47/// Main sequential thinking client implementation
48pub struct SequentialThinkingClient {
49    /// Underlying MCP client
50    client: Arc<UltraFastClient>,
51    /// Client configuration
52    config: ClientThinkingConfig,
53    /// Active thinking sessions
54    sessions: Arc<RwLock<HashMap<String, ThinkingSession>>>,
55    /// Client statistics
56    stats: Arc<RwLock<ClientStats>>,
57    /// Progress tracker
58    progress_tracker: Arc<RwLock<ProgressTracker>>,
59}
60
61/// Client statistics
62#[derive(Debug, Clone, Default)]
63pub struct ClientStats {
64    /// Total requests made
65    pub total_requests: u64,
66    /// Total thoughts processed
67    pub total_thoughts: u64,
68    /// Total sessions created
69    pub total_sessions: u64,
70    /// Average response time in milliseconds
71    pub avg_response_time_ms: f64,
72    /// Total response time in milliseconds
73    pub total_response_time_ms: u64,
74    /// Error count
75    pub error_count: u64,
76    /// Retry count
77    pub retry_count: u64,
78}
79
80/// Progress tracking information
81#[derive(Debug, Clone)]
82pub struct ProgressTracker {
83    /// Current session progress
84    pub current_progress: Option<ThinkingProgress>,
85    /// Progress history
86    pub progress_history: Vec<ThinkingProgress>,
87    /// Last update timestamp
88    pub last_update: chrono::DateTime<chrono::Utc>,
89}
90
91impl Default for ProgressTracker {
92    fn default() -> Self {
93        Self {
94            current_progress: None,
95            progress_history: Vec::new(),
96            last_update: chrono::Utc::now(),
97        }
98    }
99}
100
101/// A thinking session managed by the client
102pub struct ThinkingSession {
103    /// Session ID
104    pub session_id: String,
105    /// Session title
106    pub title: String,
107    /// Local thinking engine
108    pub engine: ThinkingEngine,
109    /// Session metadata
110    pub metadata: HashMap<String, serde_json::Value>,
111    /// Created timestamp
112    pub created_at: chrono::DateTime<chrono::Utc>,
113    /// Last activity timestamp
114    pub last_activity: chrono::DateTime<chrono::Utc>,
115}
116
117impl ThinkingSession {
118    /// Create a new thinking session
119    pub fn new(session_id: String, title: String) -> Self {
120        Self {
121            session_id,
122            title,
123            engine: ThinkingEngine::new(),
124            metadata: HashMap::new(),
125            created_at: chrono::Utc::now(),
126            last_activity: chrono::Utc::now(),
127        }
128    }
129
130    /// Add metadata to the session
131    pub fn add_metadata(&mut self, key: String, value: serde_json::Value) {
132        self.metadata.insert(key, value);
133        self.last_activity = chrono::Utc::now();
134    }
135
136    /// Get session statistics
137    pub fn get_stats(&self) -> ThinkingStats {
138        self.engine.get_stats().clone()
139    }
140
141    /// Get session progress
142    pub fn get_progress(&self) -> ThinkingProgress {
143        self.engine.get_progress().clone()
144    }
145
146    /// Check if session is complete
147    pub fn is_complete(&self) -> bool {
148        self.engine.is_complete()
149    }
150}
151
152impl SequentialThinkingClient {
153    /// Create a new sequential thinking client
154    pub async fn new(server_url: &str) -> SequentialThinkingResult<Self> {
155        let client_info = ClientInfo {
156            name: "UltraFast MCP Sequential Thinking Client".to_string(),
157            version: "0.1.0".to_string(),
158            description: Some(
159                "High-performance Rust-based MCP client for sequential thinking".to_string(),
160            ),
161            homepage: Some(
162                "https://github.com/techgopal/ultrafast-mcp-sequential-thinking".to_string(),
163            ),
164            repository: Some(
165                "https://github.com/techgopal/ultrafast-mcp-sequential-thinking".to_string(),
166            ),
167            authors: Some(vec!["techgopal <techgopal2@gmail.com>".to_string()]),
168            license: Some("MIT".to_string()),
169        };
170        let client_capabilities = ClientCapabilities::default();
171        let client = UltraFastClient::new(client_info, client_capabilities);
172
173        let mut client_instance = Self {
174            client: Arc::new(client),
175            config: ClientThinkingConfig::default(),
176            sessions: Arc::new(RwLock::new(HashMap::new())),
177            stats: Arc::new(RwLock::new(ClientStats::default())),
178            progress_tracker: Arc::new(RwLock::new(ProgressTracker::default())),
179        };
180
181        // Connect to server
182        client_instance.connect(server_url).await?;
183
184        Ok(client_instance)
185    }
186
187    /// Create a new client with custom configuration
188    pub async fn with_config(
189        server_url: &str,
190        config: ClientThinkingConfig,
191    ) -> SequentialThinkingResult<Self> {
192        let client_info = ClientInfo {
193            name: "UltraFast MCP Sequential Thinking Client".to_string(),
194            version: "0.1.0".to_string(),
195            description: Some(
196                "High-performance Rust-based MCP client for sequential thinking".to_string(),
197            ),
198            homepage: Some(
199                "https://github.com/techgopal/ultrafast-mcp-sequential-thinking".to_string(),
200            ),
201            repository: Some(
202                "https://github.com/techgopal/ultrafast-mcp-sequential-thinking".to_string(),
203            ),
204            authors: Some(vec!["techgopal <techgopal2@gmail.com>".to_string()]),
205            license: Some("MIT".to_string()),
206        };
207        let client_capabilities = ClientCapabilities::default();
208        let client = UltraFastClient::new(client_info, client_capabilities);
209
210        let mut client_instance = Self {
211            client: Arc::new(client),
212            config,
213            sessions: Arc::new(RwLock::new(HashMap::new())),
214            stats: Arc::new(RwLock::new(ClientStats::default())),
215            progress_tracker: Arc::new(RwLock::new(ProgressTracker::default())),
216        };
217
218        // Connect to server
219        client_instance.connect(server_url).await?;
220
221        Ok(client_instance)
222    }
223
224    /// Connect to the server and initialize MCP connection
225    async fn connect(&mut self, server_url: &str) -> SequentialThinkingResult<()> {
226        info!("Connecting to server: {}", server_url);
227
228        // Parse server URL to determine transport type
229        if server_url.starts_with("stdio://") || server_url == "stdio" {
230            // Connect via STDIO
231            self.client.connect_stdio().await.map_err(|e| {
232                SequentialThinkingError::transport_error(format!(
233                    "Failed to connect via STDIO: {e}"
234                ))
235            })?;
236        } else if server_url.starts_with("http://") || server_url.starts_with("https://") {
237            // Connect via HTTP
238            self.client
239                .connect_streamable_http(server_url)
240                .await
241                .map_err(|e| {
242                    SequentialThinkingError::transport_error(format!(
243                        "Failed to connect via HTTP: {e}"
244                    ))
245                })?;
246        } else {
247            return Err(SequentialThinkingError::transport_error(format!(
248                "Unsupported server URL format: {server_url}"
249            )));
250        }
251
252        info!("Connected to server, initializing MCP connection...");
253
254        // Initialize the MCP connection
255        self.client.initialize().await.map_err(|e| {
256            SequentialThinkingError::transport_error(format!(
257                "Failed to initialize MCP connection: {e}"
258            ))
259        })?;
260
261        info!("MCP connection initialized successfully");
262        Ok(())
263    }
264
265    /// Start a new thinking session
266    pub async fn start_session(&self, title: String) -> SequentialThinkingResult<ThinkingSession> {
267        let session_id = uuid::Uuid::new_v4().to_string();
268        let mut session = ThinkingSession::new(session_id.clone(), title);
269
270        // Initialize the session
271        session.engine.start_session(session_id.clone());
272
273        // Store the session
274        {
275            let mut sessions = self.sessions.write().await;
276            sessions.insert(session_id.clone(), session.clone());
277        }
278
279        // Update statistics
280        {
281            let mut stats = self.stats.write().await;
282            stats.total_sessions += 1;
283        }
284
285        info!("Started new thinking session: {}", session_id);
286        Ok(session)
287    }
288
289    /// Get a thinking session by ID
290    pub async fn get_session(&self, session_id: &str) -> Option<ThinkingSession> {
291        let sessions = self.sessions.read().await;
292        sessions.get(session_id).cloned()
293    }
294
295    /// Add a thought to a session
296    pub async fn add_thought(
297        &self,
298        _session_id: &str,
299        thought: ThoughtData,
300    ) -> SequentialThinkingResult<ThoughtData> {
301        let start_time = std::time::Instant::now();
302
303        // Update request statistics
304        {
305            let mut stats = self.stats.write().await;
306            stats.total_requests += 1;
307        }
308
309        // Process thought locally first
310        let mut sessions = self.sessions.write().await;
311        let session = sessions.get_mut(_session_id).ok_or_else(|| {
312            SequentialThinkingError::not_found(format!("Session not found: {_session_id}"))
313        })?;
314
315        let processed_thought = session
316            .engine
317            .process_thought(thought.clone())
318            .await
319            .map_err(SequentialThinkingError::processing_error)?;
320
321        // Send thought to server
322        let server_result = self.send_thought_to_server(thought).await;
323
324        // Update response time statistics
325        {
326            let response_time = start_time.elapsed();
327            let mut stats = self.stats.write().await;
328            stats.total_response_time_ms += response_time.as_millis() as u64;
329            stats.avg_response_time_ms =
330                stats.total_response_time_ms as f64 / stats.total_requests as f64;
331
332            if server_result.is_ok() {
333                stats.total_thoughts += 1;
334            } else {
335                stats.error_count += 1;
336            }
337        }
338
339        // Update progress tracking
340        if self.config.enable_progress_tracking {
341            self.update_progress_tracking(&processed_thought).await;
342        }
343
344        // Update session activity
345        session.last_activity = chrono::Utc::now();
346
347        Ok(processed_thought)
348    }
349
350    /// Send a thought to the server
351    async fn send_thought_to_server(
352        &self,
353        thought: ThoughtData,
354    ) -> SequentialThinkingResult<ToolResult> {
355        let args = serde_json::json!({
356            "thought": thought.thought,
357            "thoughtNumber": thought.thought_number,
358            "totalThoughts": thought.total_thoughts,
359            "nextThoughtNeeded": thought.next_thought_needed,
360            "isRevision": thought.is_revision,
361            "revisesThought": thought.revises_thought,
362            "branchFromThought": thought.branch_from_thought,
363            "branchId": thought.branch_id,
364            "needsMoreThoughts": thought.needs_more_thoughts
365        });
366
367        let tool_call = ToolCall {
368            name: "sequential_thinking".to_string(),
369            arguments: Some(args),
370        };
371
372        let mut attempts = 0;
373        loop {
374            match self.client.call_tool(tool_call.clone()).await {
375                Ok(result) => return Ok(result),
376                Err(e) => {
377                    attempts += 1;
378                    if attempts >= self.config.max_retry_attempts {
379                        return Err(SequentialThinkingError::transport_error(e.to_string()));
380                    }
381
382                    // Update retry statistics
383                    {
384                        let mut stats = self.stats.write().await;
385                        stats.retry_count += 1;
386                    }
387
388                    // Wait before retrying
389                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
390                }
391            }
392        }
393    }
394
395    /// Export a session
396    pub async fn export_session(
397        &self,
398        _session_id: &str,
399        format: &str,
400    ) -> SequentialThinkingResult<String> {
401        let args = serde_json::json!({
402            "format": format
403        });
404
405        let tool_call = ToolCall {
406            name: "export_session".to_string(),
407            arguments: Some(args),
408        };
409
410        let result = self
411            .client
412            .call_tool(tool_call)
413            .await
414            .map_err(|e| SequentialThinkingError::transport_error(e.to_string()))?;
415
416        // Extract content from result
417        if let Some(content) = result.content.first() {
418            match content {
419                ToolContent::Text { text } => Ok(text.clone()),
420                _ => Err(SequentialThinkingError::serialization_error(
421                    "Unexpected content type in export result".to_string(),
422                )),
423            }
424        } else {
425            Err(SequentialThinkingError::serialization_error(
426                "No content in export result".to_string(),
427            ))
428        }
429    }
430
431    /// Analyze a session
432    pub async fn analyze_session(
433        &self,
434        _session_id: &str,
435    ) -> SequentialThinkingResult<serde_json::Value> {
436        let tool_call = ToolCall {
437            name: "analyze_session".to_string(),
438            arguments: Some(serde_json::json!({})),
439        };
440
441        let result = self
442            .client
443            .call_tool(tool_call)
444            .await
445            .map_err(|e| SequentialThinkingError::transport_error(e.to_string()))?;
446
447        // Extract content from result
448        if let Some(content) = result.content.first() {
449            match content {
450                ToolContent::Text { text } => serde_json::from_str(text)
451                    .map_err(|e| SequentialThinkingError::serialization_error(e.to_string())),
452                _ => Err(SequentialThinkingError::serialization_error(
453                    "Unexpected content type in analysis result".to_string(),
454                )),
455            }
456        } else {
457            Err(SequentialThinkingError::serialization_error(
458                "No content in analysis result".to_string(),
459            ))
460        }
461    }
462
463    /// Get available tools from the server
464    pub async fn list_tools(&self) -> SequentialThinkingResult<Vec<Tool>> {
465        let tools = self
466            .client
467            .list_tools(ListToolsRequest { cursor: None })
468            .await
469            .map_err(|e| SequentialThinkingError::transport_error(e.to_string()))?;
470
471        Ok(tools.tools)
472    }
473
474    /// Get client statistics
475    pub async fn get_stats(&self) -> ClientStats {
476        self.stats.read().await.clone()
477    }
478
479    /// Get current progress
480    pub async fn get_progress(&self) -> Option<ThinkingProgress> {
481        let tracker = self.progress_tracker.read().await;
482        tracker.current_progress.clone()
483    }
484
485    /// Update progress tracking
486    async fn update_progress_tracking(&self, thought: &ThoughtData) {
487        let mut tracker = self.progress_tracker.write().await;
488        let progress = self.calculate_progress(thought);
489
490        if let Some(ref current) = tracker.current_progress {
491            let current = current.clone();
492            tracker.progress_history.push(current);
493        }
494
495        tracker.current_progress = Some(progress);
496        tracker.last_update = chrono::Utc::now();
497    }
498
499    /// Calculate progress from a thought
500    fn calculate_progress(&self, thought: &ThoughtData) -> ThinkingProgress {
501        ThinkingProgress::new(thought.thought_number, thought.total_thoughts)
502    }
503
504    /// Complete a session
505    pub async fn complete_session(&self, session_id: &str) -> SequentialThinkingResult<()> {
506        let mut sessions = self.sessions.write().await;
507        if let Some(session) = sessions.get_mut(session_id) {
508            // Mark session as complete
509            session.last_activity = chrono::Utc::now();
510            info!("Completed thinking session: {}", session_id);
511            Ok(())
512        } else {
513            Err(SequentialThinkingError::not_found(format!(
514                "Session not found: {session_id}"
515            )))
516        }
517    }
518
519    /// Remove a session
520    pub async fn remove_session(&self, session_id: &str) -> bool {
521        let mut sessions = self.sessions.write().await;
522        sessions.remove(session_id).is_some()
523    }
524
525    /// Get all session IDs
526    pub async fn get_session_ids(&self) -> Vec<String> {
527        let sessions = self.sessions.read().await;
528        sessions.keys().cloned().collect()
529    }
530
531    /// Get session statistics
532    pub async fn get_session_stats(&self, session_id: &str) -> Option<ThinkingStats> {
533        let sessions = self.sessions.read().await;
534        sessions.get(session_id).map(|s| s.get_stats())
535    }
536
537    /// Get session progress
538    pub async fn get_session_progress(&self, session_id: &str) -> Option<ThinkingProgress> {
539        let sessions = self.sessions.read().await;
540        sessions.get(session_id).map(|s| s.get_progress())
541    }
542
543    /// Check if a session is complete
544    pub async fn is_session_complete(&self, session_id: &str) -> bool {
545        let sessions = self.sessions.read().await;
546        sessions
547            .get(session_id)
548            .map(|s| s.is_complete())
549            .unwrap_or(false)
550    }
551}
552
553impl Clone for ThinkingSession {
554    fn clone(&self) -> Self {
555        Self {
556            session_id: self.session_id.clone(),
557            title: self.title.clone(),
558            engine: self.engine.clone(),
559            metadata: self.metadata.clone(),
560            created_at: self.created_at,
561            last_activity: self.last_activity,
562        }
563    }
564}
565
566impl Clone for ThinkingEngine {
567    fn clone(&self) -> Self {
568        // Note: This is a simplified clone implementation
569        // In a real implementation, you might want to implement proper cloning
570        Self::new()
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_client_creation() {
580        // This test would require a mock server or actual server running
581        // For now, we'll just test the configuration
582        let config = ClientThinkingConfig::default();
583        assert!(config.enable_progress_tracking);
584        assert_eq!(config.auto_save_interval, 60);
585    }
586
587    #[test]
588    fn test_thinking_session_creation() {
589        let session = ThinkingSession::new("test-session".to_string(), "Test Session".to_string());
590        assert_eq!(session.session_id, "test-session");
591        assert_eq!(session.title, "Test Session");
592        assert!(!session.is_complete());
593    }
594
595    #[test]
596    fn test_progress_calculation() {
597        // Use dummy ClientInfo and ClientCapabilities for UltraFastClient
598        let client_info = ClientInfo {
599            name: "Test Client".to_string(),
600            version: "0.0.1".to_string(),
601            description: None,
602            homepage: None,
603            repository: None,
604            authors: None,
605            license: None,
606        };
607        let client_capabilities = ClientCapabilities::default();
608        let client = UltraFastClient::new(client_info, client_capabilities);
609
610        let client = SequentialThinkingClient {
611            client: Arc::new(client),
612            config: ClientThinkingConfig::default(),
613            sessions: Arc::new(RwLock::new(HashMap::new())),
614            stats: Arc::new(RwLock::new(ClientStats::default())),
615            progress_tracker: Arc::new(RwLock::new(ProgressTracker::default())),
616        };
617
618        let thought = ThoughtData::new("Test thought".to_string(), 3, 5);
619        let progress = client.calculate_progress(&thought);
620
621        assert_eq!(progress.current_thought, 3);
622        assert_eq!(progress.total_thoughts, 5);
623        assert_eq!(progress.completed_thoughts, 2);
624    }
625}