reasonkit/engine/
reasoning_loop.rs

1//! # Async ReasoningLoop Engine
2//!
3//! High-performance async reasoning engine with Tokio-based concurrency,
4//! memory integration, and streaming support.
5//!
6//! ## Design Principles
7//!
8//! 1. **Async-First**: All I/O operations are non-blocking
9//! 2. **Channel-Based Concurrency**: ThinkTools execute via broadcast channels
10//! 3. **Streaming Output**: Real-time step-by-step reasoning visibility
11//! 4. **Memory Integration**: Optional reasonkit-mem for context enrichment
12//! 5. **Profile System**: quick/balanced/deep/paranoid execution modes
13//!
14//! ## Performance Characteristics
15//!
16//! - Concurrent ThinkTool execution reduces latency by (N-1)/N for N independent tools
17//! - Zero-copy streaming via tokio broadcast channels
18//! - Bounded channel backpressure prevents memory exhaustion
19//! - Connection pooling for LLM API calls
20
21use async_trait::async_trait;
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use thiserror::Error;
27use tokio::sync::{broadcast, RwLock};
28use uuid::Uuid;
29
30// ============================================================================
31// ERROR TYPES
32// ============================================================================
33
34/// Errors that can occur during reasoning loop execution
35#[derive(Error, Debug)]
36pub enum ReasoningError {
37    /// ThinkTool execution failed
38    #[error("ThinkTool '{tool}' failed: {message}")]
39    ThinkToolFailed { tool: String, message: String },
40
41    /// Memory query failed
42    #[error("Memory query failed: {0}")]
43    MemoryQueryFailed(String),
44
45    /// Profile not found
46    #[error("Profile '{0}' not found")]
47    ProfileNotFound(String),
48
49    /// Channel communication error
50    #[error("Channel error: {0}")]
51    ChannelError(String),
52
53    /// Timeout during execution
54    #[error("Execution timed out after {0:?}")]
55    Timeout(Duration),
56
57    /// Configuration error
58    #[error("Configuration error: {0}")]
59    Config(String),
60
61    /// LLM provider error
62    #[error("LLM error: {0}")]
63    LlmError(String),
64
65    /// Cancelled by user
66    #[error("Execution cancelled")]
67    Cancelled,
68
69    /// Confidence threshold not met
70    #[error("Confidence {actual:.2} below threshold {required:.2}")]
71    ConfidenceBelowThreshold { actual: f64, required: f64 },
72}
73
74/// Result type for reasoning operations
75pub type Result<T> = std::result::Result<T, ReasoningError>;
76
77// ============================================================================
78// PROFILE SYSTEM
79// ============================================================================
80
81/// Reasoning profiles that determine ThinkTool chains and confidence targets
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
83#[serde(rename_all = "lowercase")]
84pub enum Profile {
85    /// Fast 2-step: GigaThink -> LaserLogic (70% confidence target)
86    Quick,
87
88    /// Standard 4-step: GigaThink -> LaserLogic -> BedRock -> ProofGuard (80%)
89    #[default]
90    Balanced,
91
92    /// Thorough 5-step with meta-cognition (85% confidence target)
93    Deep,
94
95    /// Maximum verification with adversarial critique (95% confidence target)
96    Paranoid,
97}
98
99impl Profile {
100    /// Get the ThinkTool chain for this profile
101    pub fn thinktool_chain(&self) -> Vec<&'static str> {
102        match self {
103            Profile::Quick => vec!["gigathink", "laserlogic"],
104            Profile::Balanced => vec!["gigathink", "laserlogic", "bedrock", "proofguard"],
105            Profile::Deep => vec![
106                "gigathink",
107                "laserlogic",
108                "bedrock",
109                "proofguard",
110                "brutalhonesty",
111            ],
112            Profile::Paranoid => vec![
113                "gigathink",
114                "laserlogic",
115                "bedrock",
116                "proofguard",
117                "brutalhonesty",
118                "proofguard", // Second verification pass
119            ],
120        }
121    }
122
123    /// Get the minimum confidence threshold for this profile
124    pub fn min_confidence(&self) -> f64 {
125        match self {
126            Profile::Quick => 0.70,
127            Profile::Balanced => 0.80,
128            Profile::Deep => 0.85,
129            Profile::Paranoid => 0.95,
130        }
131    }
132
133    /// Get the maximum token budget for this profile
134    pub fn token_budget(&self) -> u32 {
135        match self {
136            Profile::Quick => 3_000,
137            Profile::Balanced => 8_000,
138            Profile::Deep => 12_000,
139            Profile::Paranoid => 25_000,
140        }
141    }
142
143    /// Parse profile from string
144    pub fn parse_profile(s: &str) -> Option<Self> {
145        match s.to_lowercase().as_str() {
146            "quick" | "q" => Some(Profile::Quick),
147            "balanced" | "b" => Some(Profile::Balanced),
148            "deep" | "d" => Some(Profile::Deep),
149            "paranoid" | "p" => Some(Profile::Paranoid),
150            _ => None,
151        }
152    }
153}
154
155impl std::fmt::Display for Profile {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        match self {
158            Profile::Quick => write!(f, "quick"),
159            Profile::Balanced => write!(f, "balanced"),
160            Profile::Deep => write!(f, "deep"),
161            Profile::Paranoid => write!(f, "paranoid"),
162        }
163    }
164}
165
166// ============================================================================
167// CONFIGURATION
168// ============================================================================
169
170/// Configuration for the ReasoningLoop
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct ReasoningConfig {
173    /// Default profile to use
174    #[serde(default)]
175    pub default_profile: Profile,
176
177    /// Maximum execution time
178    #[serde(default = "default_timeout")]
179    pub timeout: Duration,
180
181    /// Enable parallel ThinkTool execution when possible
182    #[serde(default = "default_true")]
183    pub enable_parallel: bool,
184
185    /// Maximum concurrent ThinkTool executions
186    #[serde(default = "default_max_concurrent")]
187    pub max_concurrent: usize,
188
189    /// Enable memory integration
190    #[serde(default)]
191    pub enable_memory: bool,
192
193    /// Number of memory results to retrieve
194    #[serde(default = "default_memory_top_k")]
195    pub memory_top_k: usize,
196
197    /// Minimum relevance score for memory results
198    #[serde(default = "default_memory_min_score")]
199    pub memory_min_score: f32,
200
201    /// Enable streaming output
202    #[serde(default = "default_true")]
203    pub enable_streaming: bool,
204
205    /// Streaming buffer size
206    #[serde(default = "default_stream_buffer")]
207    pub stream_buffer_size: usize,
208
209    /// LLM temperature (0.0-1.0)
210    #[serde(default = "default_temperature")]
211    pub temperature: f64,
212
213    /// LLM max tokens per step
214    #[serde(default = "default_max_tokens")]
215    pub max_tokens: u32,
216
217    /// Retry failed steps
218    #[serde(default = "default_true")]
219    pub retry_on_failure: bool,
220
221    /// Maximum retries per step
222    #[serde(default = "default_max_retries")]
223    pub max_retries: u32,
224}
225
226fn default_timeout() -> Duration {
227    Duration::from_secs(300) // 5 minutes
228}
229fn default_true() -> bool {
230    true
231}
232fn default_max_concurrent() -> usize {
233    4
234}
235fn default_memory_top_k() -> usize {
236    10
237}
238fn default_memory_min_score() -> f32 {
239    0.5
240}
241fn default_stream_buffer() -> usize {
242    32
243}
244fn default_temperature() -> f64 {
245    0.7
246}
247fn default_max_tokens() -> u32 {
248    2048
249}
250fn default_max_retries() -> u32 {
251    2
252}
253
254impl Default for ReasoningConfig {
255    fn default() -> Self {
256        Self {
257            default_profile: Profile::Balanced,
258            timeout: default_timeout(),
259            enable_parallel: true,
260            max_concurrent: default_max_concurrent(),
261            enable_memory: false,
262            memory_top_k: default_memory_top_k(),
263            memory_min_score: default_memory_min_score(),
264            enable_streaming: true,
265            stream_buffer_size: default_stream_buffer(),
266            temperature: default_temperature(),
267            max_tokens: default_max_tokens(),
268            retry_on_failure: true,
269            max_retries: default_max_retries(),
270        }
271    }
272}
273
274// ============================================================================
275// MEMORY INTEGRATION
276// ============================================================================
277
278/// Context retrieved from memory
279#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct MemoryContext {
281    /// Retrieved chunks with relevance scores
282    pub chunks: Vec<MemoryChunk>,
283
284    /// Query that was used
285    pub query: String,
286
287    /// Time taken for retrieval
288    pub retrieval_time_ms: u64,
289
290    /// Whether RAPTOR tree was used
291    pub used_raptor: bool,
292}
293
294/// A chunk retrieved from memory
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct MemoryChunk {
297    /// Chunk ID
298    pub id: Uuid,
299
300    /// Document ID
301    pub doc_id: Uuid,
302
303    /// Text content
304    pub text: String,
305
306    /// Relevance score
307    pub score: f32,
308
309    /// Source metadata
310    pub source: Option<String>,
311}
312
313/// Trait for memory providers (enables mock testing and multiple backends)
314#[async_trait]
315pub trait MemoryProvider: Send + Sync {
316    /// Query memory for relevant context
317    async fn query(&self, query: &str, top_k: usize, min_score: f32) -> Result<MemoryContext>;
318
319    /// Store a reasoning session for future reference
320    async fn store_session(&self, session: &ReasoningSession) -> Result<()>;
321}
322
323// ============================================================================
324// THINKTOOL EXECUTION
325// ============================================================================
326
327/// Result from a ThinkTool execution
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ThinkToolResult {
330    /// ThinkTool that was executed
331    pub tool_id: String,
332
333    /// Output content
334    pub content: String,
335
336    /// Confidence score (0.0-1.0)
337    pub confidence: f64,
338
339    /// Execution time in milliseconds
340    pub duration_ms: u64,
341
342    /// Token usage
343    pub tokens: TokenUsage,
344
345    /// Structured output (if applicable)
346    pub structured: Option<serde_json::Value>,
347
348    /// Warnings or notes
349    pub notes: Vec<String>,
350}
351
352/// Token usage statistics
353#[derive(Debug, Clone, Default, Serialize, Deserialize)]
354pub struct TokenUsage {
355    pub input_tokens: u32,
356    pub output_tokens: u32,
357    pub total_tokens: u32,
358    pub cost_usd: f64,
359}
360
361impl TokenUsage {
362    pub fn add(&mut self, other: &TokenUsage) {
363        self.input_tokens += other.input_tokens;
364        self.output_tokens += other.output_tokens;
365        self.total_tokens += other.total_tokens;
366        self.cost_usd += other.cost_usd;
367    }
368}
369
370/// Trait for ThinkTool execution (enables custom implementations)
371#[async_trait]
372pub trait ThinkToolExecutor: Send + Sync {
373    /// Execute a ThinkTool with the given input
374    async fn execute(
375        &self,
376        tool_id: &str,
377        input: &str,
378        context: &ExecutionContext,
379    ) -> Result<ThinkToolResult>;
380
381    /// List available ThinkTools
382    fn available_tools(&self) -> Vec<&str>;
383}
384
385/// Context for ThinkTool execution
386#[derive(Debug, Clone)]
387pub struct ExecutionContext {
388    /// Session ID
389    pub session_id: Uuid,
390
391    /// Current profile
392    pub profile: Profile,
393
394    /// Memory context (if available)
395    pub memory: Option<MemoryContext>,
396
397    /// Previous step outputs
398    pub previous_outputs: HashMap<String, ThinkToolResult>,
399
400    /// LLM temperature
401    pub temperature: f64,
402
403    /// Max tokens
404    pub max_tokens: u32,
405}
406
407// ============================================================================
408// STREAMING OUTPUT
409// ============================================================================
410
411/// Events emitted during reasoning loop execution
412#[derive(Debug, Clone, Serialize, Deserialize)]
413#[serde(tag = "type", rename_all = "snake_case")]
414pub enum ReasoningEvent {
415    /// Reasoning session started
416    SessionStarted {
417        session_id: Uuid,
418        profile: Profile,
419        prompt: String,
420    },
421
422    /// Memory query completed
423    MemoryQueried {
424        chunks_found: usize,
425        retrieval_time_ms: u64,
426    },
427
428    /// ThinkTool step started
429    StepStarted {
430        step_index: usize,
431        total_steps: usize,
432        tool_id: String,
433    },
434
435    /// ThinkTool step completed
436    StepCompleted {
437        step_index: usize,
438        tool_id: String,
439        confidence: f64,
440        duration_ms: u64,
441    },
442
443    /// Partial output from a step (for streaming LLM responses)
444    PartialOutput { tool_id: String, delta: String },
445
446    /// Warning or note
447    Warning { message: String },
448
449    /// Final decision reached
450    DecisionReached {
451        confidence: f64,
452        total_duration_ms: u64,
453    },
454
455    /// Error occurred
456    Error { message: String },
457
458    /// Session completed
459    SessionCompleted { success: bool },
460}
461
462/// Handle for receiving streaming events
463pub struct StreamHandle {
464    receiver: broadcast::Receiver<ReasoningEvent>,
465    session_id: Uuid,
466}
467
468impl StreamHandle {
469    /// Receive the next event
470    pub async fn next(&mut self) -> Option<ReasoningEvent> {
471        loop {
472            match self.receiver.recv().await {
473                Ok(event) => return Some(event),
474                Err(broadcast::error::RecvError::Closed) => return None,
475                Err(broadcast::error::RecvError::Lagged(_)) => {
476                    // If we lagged, continue loop to get next available (non-recursive)
477                    continue;
478                }
479            }
480        }
481    }
482
483    /// Get session ID
484    pub fn session_id(&self) -> Uuid {
485        self.session_id
486    }
487}
488
489// ============================================================================
490// REASONING STEP & DECISION
491// ============================================================================
492
493/// Kind of reasoning step
494#[derive(Debug, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "snake_case")]
496pub enum StepKind {
497    /// Memory retrieval
498    MemoryQuery,
499    /// ThinkTool execution
500    ThinkTool { tool_id: String },
501    /// Synthesis of previous steps
502    Synthesis,
503    /// Validation pass
504    Validation,
505}
506
507/// A single step in the reasoning process
508#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct ReasoningStep {
510    /// Step index (0-based)
511    pub index: usize,
512
513    /// Kind of step
514    pub kind: StepKind,
515
516    /// Input to this step
517    pub input: String,
518
519    /// Output from this step
520    pub output: String,
521
522    /// Confidence score
523    pub confidence: f64,
524
525    /// Duration in milliseconds
526    pub duration_ms: u64,
527
528    /// Token usage
529    pub tokens: TokenUsage,
530
531    /// Whether step succeeded
532    pub success: bool,
533
534    /// Error message if failed
535    pub error: Option<String>,
536}
537
538/// Final decision from reasoning loop
539#[derive(Debug, Clone, Serialize, Deserialize)]
540pub struct Decision {
541    /// Unique decision ID
542    pub id: Uuid,
543
544    /// Session ID this decision belongs to
545    pub session_id: Uuid,
546
547    /// Original prompt
548    pub prompt: String,
549
550    /// Profile used
551    pub profile: Profile,
552
553    /// Final conclusion/answer
554    pub conclusion: String,
555
556    /// Overall confidence (0.0-1.0)
557    pub confidence: f64,
558
559    /// All reasoning steps taken
560    pub steps: Vec<ReasoningStep>,
561
562    /// Total token usage
563    pub total_tokens: TokenUsage,
564
565    /// Total duration in milliseconds
566    pub total_duration_ms: u64,
567
568    /// Memory context used (if any)
569    pub memory_context: Option<MemoryContext>,
570
571    /// Whether reasoning succeeded
572    pub success: bool,
573
574    /// Key insights extracted
575    pub insights: Vec<String>,
576
577    /// Caveats or limitations noted
578    pub caveats: Vec<String>,
579
580    /// Timestamp
581    pub timestamp: chrono::DateTime<chrono::Utc>,
582}
583
584impl Decision {
585    /// Check if confidence meets profile threshold
586    pub fn meets_threshold(&self) -> bool {
587        self.confidence >= self.profile.min_confidence()
588    }
589
590    /// Get a summary of the decision
591    pub fn summary(&self) -> String {
592        format!(
593            "[{}] {} (confidence: {:.0}%, {} steps, {}ms)",
594            self.profile,
595            if self.success { "SUCCESS" } else { "FAILED" },
596            self.confidence * 100.0,
597            self.steps.len(),
598            self.total_duration_ms
599        )
600    }
601}
602
603// ============================================================================
604// REASONING SESSION
605// ============================================================================
606
607/// Stateful reasoning session with accumulated context
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct ReasoningSession {
610    /// Session ID
611    pub id: Uuid,
612
613    /// Profile being used
614    pub profile: Profile,
615
616    /// Original prompt
617    pub prompt: String,
618
619    /// Current step index
620    pub current_step: usize,
621
622    /// All steps executed
623    pub steps: Vec<ReasoningStep>,
624
625    /// Memory context
626    pub memory_context: Option<MemoryContext>,
627
628    /// Accumulated token usage
629    pub total_tokens: TokenUsage,
630
631    /// Session start time
632    pub started_at: chrono::DateTime<chrono::Utc>,
633
634    /// Whether session is complete
635    pub completed: bool,
636
637    /// Final decision (if completed)
638    pub decision: Option<Decision>,
639}
640
641impl ReasoningSession {
642    /// Create a new session
643    pub fn new(prompt: &str, profile: Profile) -> Self {
644        Self {
645            id: Uuid::new_v4(),
646            profile,
647            prompt: prompt.to_string(),
648            current_step: 0,
649            steps: Vec::new(),
650            memory_context: None,
651            total_tokens: TokenUsage::default(),
652            started_at: chrono::Utc::now(),
653            completed: false,
654            decision: None,
655        }
656    }
657
658    /// Add a completed step
659    pub fn add_step(&mut self, step: ReasoningStep) {
660        self.total_tokens.add(&step.tokens);
661        self.steps.push(step);
662        self.current_step += 1;
663    }
664
665    /// Get current confidence (average of all steps)
666    pub fn current_confidence(&self) -> f64 {
667        if self.steps.is_empty() {
668            0.0
669        } else {
670            self.steps.iter().map(|s| s.confidence).sum::<f64>() / self.steps.len() as f64
671        }
672    }
673
674    /// Complete the session with a decision
675    pub fn complete(&mut self, conclusion: String, insights: Vec<String>, caveats: Vec<String>) {
676        let total_duration_ms = (chrono::Utc::now() - self.started_at).num_milliseconds() as u64;
677
678        self.decision = Some(Decision {
679            id: Uuid::new_v4(),
680            session_id: self.id,
681            prompt: self.prompt.clone(),
682            profile: self.profile,
683            conclusion,
684            confidence: self.current_confidence(),
685            steps: self.steps.clone(),
686            total_tokens: self.total_tokens.clone(),
687            total_duration_ms,
688            memory_context: self.memory_context.clone(),
689            success: true,
690            insights,
691            caveats,
692            timestamp: chrono::Utc::now(),
693        });
694
695        self.completed = true;
696    }
697}
698
699// ============================================================================
700// REASONING LOOP BUILDER
701// ============================================================================
702
703/// Builder for ReasoningLoop with fluent API
704pub struct ReasoningLoopBuilder {
705    config: ReasoningConfig,
706    executor: Option<Arc<dyn ThinkToolExecutor>>,
707    memory: Option<Arc<dyn MemoryProvider>>,
708}
709
710impl ReasoningLoopBuilder {
711    /// Create a new builder with default config
712    pub fn new() -> Self {
713        Self {
714            config: ReasoningConfig::default(),
715            executor: None,
716            memory: None,
717        }
718    }
719
720    /// Set the default profile
721    pub fn with_profile(mut self, profile: Profile) -> Self {
722        self.config.default_profile = profile;
723        self
724    }
725
726    /// Set the execution timeout
727    pub fn with_timeout(mut self, timeout: Duration) -> Self {
728        self.config.timeout = timeout;
729        self
730    }
731
732    /// Enable parallel execution
733    pub fn with_parallel(mut self, enabled: bool, max_concurrent: usize) -> Self {
734        self.config.enable_parallel = enabled;
735        self.config.max_concurrent = max_concurrent;
736        self
737    }
738
739    /// Set the ThinkTool executor
740    pub fn with_executor(mut self, executor: Arc<dyn ThinkToolExecutor>) -> Self {
741        self.executor = Some(executor);
742        self
743    }
744
745    /// Set the memory provider
746    pub fn with_memory(mut self, memory: Arc<dyn MemoryProvider>) -> Self {
747        self.memory = Some(memory);
748        self.config.enable_memory = true;
749        self
750    }
751
752    /// Configure memory retrieval
753    pub fn with_memory_config(mut self, top_k: usize, min_score: f32) -> Self {
754        self.config.memory_top_k = top_k;
755        self.config.memory_min_score = min_score;
756        self
757    }
758
759    /// Enable streaming output
760    pub fn with_streaming(mut self, enabled: bool, buffer_size: usize) -> Self {
761        self.config.enable_streaming = enabled;
762        self.config.stream_buffer_size = buffer_size;
763        self
764    }
765
766    /// Set LLM parameters
767    pub fn with_llm_params(mut self, temperature: f64, max_tokens: u32) -> Self {
768        self.config.temperature = temperature;
769        self.config.max_tokens = max_tokens;
770        self
771    }
772
773    /// Set retry configuration
774    pub fn with_retries(mut self, enabled: bool, max_retries: u32) -> Self {
775        self.config.retry_on_failure = enabled;
776        self.config.max_retries = max_retries;
777        self
778    }
779
780    /// Set the full config
781    pub fn with_config(mut self, config: ReasoningConfig) -> Self {
782        self.config = config;
783        self
784    }
785
786    /// Build the ReasoningLoop
787    pub fn build(self) -> Result<ReasoningLoop> {
788        let executor = self
789            .executor
790            .ok_or_else(|| ReasoningError::Config("ThinkTool executor required".into()))?;
791
792        Ok(ReasoningLoop {
793            config: self.config,
794            executor,
795            memory: self.memory,
796            active_sessions: Arc::new(RwLock::new(HashMap::new())),
797            _event_sender: None,
798        })
799    }
800}
801
802impl Default for ReasoningLoopBuilder {
803    fn default() -> Self {
804        Self::new()
805    }
806}
807
808// ============================================================================
809// REASONING LOOP - MAIN ENGINE
810// ============================================================================
811
812/// High-performance async reasoning engine
813///
814/// The ReasoningLoop orchestrates ThinkTool execution with:
815/// - Async/concurrent execution via Tokio
816/// - Optional memory integration via reasonkit-mem
817/// - Streaming output via broadcast channels
818/// - Profile-based execution modes
819pub struct ReasoningLoop {
820    /// Configuration
821    config: ReasoningConfig,
822
823    /// ThinkTool executor
824    executor: Arc<dyn ThinkToolExecutor>,
825
826    /// Memory provider (optional)
827    memory: Option<Arc<dyn MemoryProvider>>,
828
829    /// Active sessions
830    active_sessions: Arc<RwLock<HashMap<Uuid, ReasoningSession>>>,
831
832    /// Event broadcaster (lazily initialized)
833    _event_sender: Option<broadcast::Sender<ReasoningEvent>>,
834}
835
836impl ReasoningLoop {
837    /// Create a builder for ReasoningLoop
838    pub fn builder() -> ReasoningLoopBuilder {
839        ReasoningLoopBuilder::new()
840    }
841
842    /// Get configuration
843    pub fn config(&self) -> &ReasoningConfig {
844        &self.config
845    }
846
847    /// Execute reasoning with streaming output
848    pub async fn reason_stream(&self, prompt: &str) -> Result<(StreamHandle, Decision)> {
849        self.reason_stream_with_profile(prompt, self.config.default_profile)
850            .await
851    }
852
853    /// Execute reasoning with streaming output and custom profile
854    pub async fn reason_stream_with_profile(
855        &self,
856        prompt: &str,
857        profile: Profile,
858    ) -> Result<(StreamHandle, Decision)> {
859        // Create broadcast channel for events
860        let (tx, rx) = broadcast::channel(self.config.stream_buffer_size);
861
862        let session = ReasoningSession::new(prompt, profile);
863        let session_id = session.id;
864
865        // Store session
866        {
867            let mut sessions = self.active_sessions.write().await;
868            sessions.insert(session_id, session.clone());
869        }
870
871        // Send start event
872        let _ = tx.send(ReasoningEvent::SessionStarted {
873            session_id,
874            profile,
875            prompt: prompt.to_string(),
876        });
877
878        // Execute reasoning
879        let decision = self.execute_loop(session, Some(&tx)).await?;
880
881        // Send completion event
882        let _ = tx.send(ReasoningEvent::SessionCompleted {
883            success: decision.success,
884        });
885
886        // Remove from active sessions
887        {
888            let mut sessions = self.active_sessions.write().await;
889            sessions.remove(&session_id);
890        }
891
892        Ok((
893            StreamHandle {
894                receiver: rx,
895                session_id,
896            },
897            decision,
898        ))
899    }
900
901    /// Execute reasoning and return decision (blocking until complete)
902    pub async fn reason(&self, prompt: &str) -> Result<Decision> {
903        self.reason_with_profile(prompt, self.config.default_profile)
904            .await
905    }
906
907    /// Execute reasoning with custom profile
908    pub async fn reason_with_profile(&self, prompt: &str, profile: Profile) -> Result<Decision> {
909        let session = ReasoningSession::new(prompt, profile);
910        self.execute_loop(session, None).await
911    }
912
913    /// Core execution loop
914    async fn execute_loop(
915        &self,
916        mut session: ReasoningSession,
917        event_tx: Option<&broadcast::Sender<ReasoningEvent>>,
918    ) -> Result<Decision> {
919        let start = Instant::now();
920        let profile = session.profile;
921
922        // Step 1: Query memory if enabled
923        if self.config.enable_memory {
924            if let Some(ref memory) = self.memory {
925                let mem_start = Instant::now();
926                match memory
927                    .query(
928                        &session.prompt,
929                        self.config.memory_top_k,
930                        self.config.memory_min_score,
931                    )
932                    .await
933                {
934                    Ok(context) => {
935                        let retrieval_time = mem_start.elapsed().as_millis() as u64;
936
937                        if let Some(tx) = event_tx {
938                            let _ = tx.send(ReasoningEvent::MemoryQueried {
939                                chunks_found: context.chunks.len(),
940                                retrieval_time_ms: retrieval_time,
941                            });
942                        }
943
944                        session.memory_context = Some(context);
945                    }
946                    Err(e) => {
947                        if let Some(tx) = event_tx {
948                            let _ = tx.send(ReasoningEvent::Warning {
949                                message: format!("Memory query failed: {}", e),
950                            });
951                        }
952                    }
953                }
954            }
955        }
956
957        // Step 2: Execute ThinkTool chain
958        let tools = profile.thinktool_chain();
959        let total_steps = tools.len();
960
961        let mut previous_outputs: HashMap<String, ThinkToolResult> = HashMap::new();
962
963        for (step_idx, tool_id) in tools.iter().enumerate() {
964            // Check timeout
965            if start.elapsed() > self.config.timeout {
966                return Err(ReasoningError::Timeout(self.config.timeout));
967            }
968
969            // Emit step start event
970            if let Some(tx) = event_tx {
971                let _ = tx.send(ReasoningEvent::StepStarted {
972                    step_index: step_idx,
973                    total_steps,
974                    tool_id: tool_id.to_string(),
975                });
976            }
977
978            // Build input for this step
979            let input = self.build_step_input(&session, &previous_outputs, step_idx);
980
981            // Create execution context
982            let context = ExecutionContext {
983                session_id: session.id,
984                profile,
985                memory: session.memory_context.clone(),
986                previous_outputs: previous_outputs.clone(),
987                temperature: self.config.temperature,
988                max_tokens: self.config.max_tokens,
989            };
990
991            // Execute with retry logic
992            let result = self.execute_with_retry(tool_id, &input, &context).await?;
993
994            // Create reasoning step
995            let step = ReasoningStep {
996                index: step_idx,
997                kind: StepKind::ThinkTool {
998                    tool_id: tool_id.to_string(),
999                },
1000                input: input.clone(),
1001                output: result.content.clone(),
1002                confidence: result.confidence,
1003                duration_ms: result.duration_ms,
1004                tokens: result.tokens.clone(),
1005                success: true,
1006                error: None,
1007            };
1008
1009            // Emit step complete event
1010            if let Some(tx) = event_tx {
1011                let _ = tx.send(ReasoningEvent::StepCompleted {
1012                    step_index: step_idx,
1013                    tool_id: tool_id.to_string(),
1014                    confidence: result.confidence,
1015                    duration_ms: result.duration_ms,
1016                });
1017            }
1018
1019            // Store result for next step
1020            previous_outputs.insert(tool_id.to_string(), result);
1021            session.add_step(step);
1022        }
1023
1024        // Step 3: Synthesize final decision
1025        let (conclusion, insights, caveats) = self.synthesize_decision(&session, &previous_outputs);
1026
1027        session.complete(conclusion, insights, caveats);
1028
1029        let decision = session.decision.clone().unwrap();
1030
1031        // Emit decision event
1032        if let Some(tx) = event_tx {
1033            let _ = tx.send(ReasoningEvent::DecisionReached {
1034                confidence: decision.confidence,
1035                total_duration_ms: decision.total_duration_ms,
1036            });
1037        }
1038
1039        // Store session to memory if enabled
1040        if self.config.enable_memory {
1041            if let Some(ref memory) = self.memory {
1042                let _ = memory.store_session(&session).await;
1043            }
1044        }
1045
1046        Ok(decision)
1047    }
1048
1049    /// Build input for a step based on previous outputs
1050    fn build_step_input(
1051        &self,
1052        session: &ReasoningSession,
1053        previous_outputs: &HashMap<String, ThinkToolResult>,
1054        step_idx: usize,
1055    ) -> String {
1056        let mut input = session.prompt.clone();
1057
1058        // Add memory context if available
1059        if let Some(ref memory) = session.memory_context {
1060            if !memory.chunks.is_empty() {
1061                input.push_str("\n\n--- RELEVANT CONTEXT ---\n");
1062                for chunk in memory.chunks.iter().take(3) {
1063                    input.push_str(&format!("- {}\n", chunk.text));
1064                }
1065            }
1066        }
1067
1068        // Add previous step outputs for context
1069        if step_idx > 0 {
1070            input.push_str("\n\n--- PREVIOUS ANALYSIS ---\n");
1071            for (tool_id, result) in previous_outputs {
1072                // Truncate long outputs
1073                let content = if result.content.len() > 500 {
1074                    format!("{}...", &result.content[..500])
1075                } else {
1076                    result.content.clone()
1077                };
1078                input.push_str(&format!(
1079                    "[{}] (confidence: {:.0}%)\n{}\n\n",
1080                    tool_id,
1081                    result.confidence * 100.0,
1082                    content
1083                ));
1084            }
1085        }
1086
1087        input
1088    }
1089
1090    /// Execute a ThinkTool with retry logic
1091    async fn execute_with_retry(
1092        &self,
1093        tool_id: &str,
1094        input: &str,
1095        context: &ExecutionContext,
1096    ) -> Result<ThinkToolResult> {
1097        let mut last_error = None;
1098
1099        for attempt in 0..=self.config.max_retries {
1100            match self.executor.execute(tool_id, input, context).await {
1101                Ok(result) => return Ok(result),
1102                Err(e) => {
1103                    if !self.config.retry_on_failure || attempt == self.config.max_retries {
1104                        return Err(e);
1105                    }
1106                    last_error = Some(e);
1107                    // Exponential backoff
1108                    let delay = Duration::from_millis(100 * 2u64.pow(attempt));
1109                    tokio::time::sleep(delay).await;
1110                }
1111            }
1112        }
1113
1114        Err(
1115            last_error.unwrap_or_else(|| ReasoningError::ThinkToolFailed {
1116                tool: tool_id.to_string(),
1117                message: "Unknown error".into(),
1118            }),
1119        )
1120    }
1121
1122    /// Synthesize final decision from all steps
1123    fn synthesize_decision(
1124        &self,
1125        session: &ReasoningSession,
1126        outputs: &HashMap<String, ThinkToolResult>,
1127    ) -> (String, Vec<String>, Vec<String>) {
1128        // Extract conclusion from last step
1129        let conclusion = outputs
1130            .values()
1131            .last()
1132            .map(|r| r.content.clone())
1133            .unwrap_or_else(|| "No conclusion reached".to_string());
1134
1135        // Extract insights from various tools
1136        let mut insights = Vec::new();
1137        if let Some(gt) = outputs.get("gigathink") {
1138            if let Some(structured) = &gt.structured {
1139                if let Some(perspectives) = structured.get("perspectives") {
1140                    if let Some(arr) = perspectives.as_array() {
1141                        for p in arr.iter().take(3) {
1142                            if let Some(s) = p.as_str() {
1143                                insights.push(s.to_string());
1144                            }
1145                        }
1146                    }
1147                }
1148            }
1149        }
1150
1151        // Extract caveats from BrutalHonesty if available
1152        let mut caveats = Vec::new();
1153        if let Some(bh) = outputs.get("brutalhonesty") {
1154            if bh.content.to_lowercase().contains("caveat")
1155                || bh.content.to_lowercase().contains("limitation")
1156            {
1157                caveats.push("See BrutalHonesty analysis for detailed limitations".to_string());
1158            }
1159        }
1160
1161        // Add confidence-based caveat
1162        let confidence = session.current_confidence();
1163        if confidence < session.profile.min_confidence() {
1164            caveats.push(format!(
1165                "Confidence ({:.0}%) below target ({:.0}%)",
1166                confidence * 100.0,
1167                session.profile.min_confidence() * 100.0
1168            ));
1169        }
1170
1171        (conclusion, insights, caveats)
1172    }
1173
1174    /// Get an active session by ID
1175    pub async fn get_session(&self, session_id: Uuid) -> Option<ReasoningSession> {
1176        let sessions = self.active_sessions.read().await;
1177        sessions.get(&session_id).cloned()
1178    }
1179
1180    /// Cancel an active session
1181    pub async fn cancel_session(&self, session_id: Uuid) -> Result<()> {
1182        let mut sessions = self.active_sessions.write().await;
1183        if sessions.remove(&session_id).is_some() {
1184            Ok(())
1185        } else {
1186            Err(ReasoningError::Config(format!(
1187                "Session {} not found",
1188                session_id
1189            )))
1190        }
1191    }
1192
1193    /// Get count of active sessions
1194    pub async fn active_session_count(&self) -> usize {
1195        let sessions = self.active_sessions.read().await;
1196        sessions.len()
1197    }
1198}
1199
1200// ============================================================================
1201// TESTS
1202// ============================================================================
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207
1208    #[test]
1209    fn test_profile_chains() {
1210        assert_eq!(Profile::Quick.thinktool_chain().len(), 2);
1211        assert_eq!(Profile::Balanced.thinktool_chain().len(), 4);
1212        assert_eq!(Profile::Deep.thinktool_chain().len(), 5);
1213        assert_eq!(Profile::Paranoid.thinktool_chain().len(), 6);
1214    }
1215
1216    #[test]
1217    fn test_profile_confidence() {
1218        assert_eq!(Profile::Quick.min_confidence(), 0.70);
1219        assert_eq!(Profile::Balanced.min_confidence(), 0.80);
1220        assert_eq!(Profile::Deep.min_confidence(), 0.85);
1221        assert_eq!(Profile::Paranoid.min_confidence(), 0.95);
1222    }
1223
1224    #[test]
1225    fn test_profile_from_str() {
1226        assert_eq!(Profile::parse_profile("quick"), Some(Profile::Quick));
1227        assert_eq!(Profile::parse_profile("Q"), Some(Profile::Quick));
1228        assert_eq!(Profile::parse_profile("balanced"), Some(Profile::Balanced));
1229        assert_eq!(Profile::parse_profile("PARANOID"), Some(Profile::Paranoid));
1230        assert_eq!(Profile::parse_profile("invalid"), None);
1231    }
1232
1233    #[test]
1234    fn test_config_defaults() {
1235        let config = ReasoningConfig::default();
1236        assert_eq!(config.default_profile, Profile::Balanced);
1237        assert!(config.enable_parallel);
1238        assert_eq!(config.max_concurrent, 4);
1239        assert!(!config.enable_memory);
1240    }
1241
1242    #[test]
1243    fn test_session_creation() {
1244        let session = ReasoningSession::new("Test prompt", Profile::Balanced);
1245        assert!(!session.completed);
1246        assert_eq!(session.current_step, 0);
1247        assert!(session.steps.is_empty());
1248    }
1249
1250    #[test]
1251    fn test_session_confidence() {
1252        let mut session = ReasoningSession::new("Test", Profile::Balanced);
1253
1254        // Empty session has 0 confidence
1255        assert_eq!(session.current_confidence(), 0.0);
1256
1257        // Add steps with varying confidence
1258        session.add_step(ReasoningStep {
1259            index: 0,
1260            kind: StepKind::ThinkTool {
1261                tool_id: "gigathink".into(),
1262            },
1263            input: "test".into(),
1264            output: "output".into(),
1265            confidence: 0.8,
1266            duration_ms: 100,
1267            tokens: TokenUsage::default(),
1268            success: true,
1269            error: None,
1270        });
1271
1272        session.add_step(ReasoningStep {
1273            index: 1,
1274            kind: StepKind::ThinkTool {
1275                tool_id: "laserlogic".into(),
1276            },
1277            input: "test".into(),
1278            output: "output".into(),
1279            confidence: 0.9,
1280            duration_ms: 100,
1281            tokens: TokenUsage::default(),
1282            success: true,
1283            error: None,
1284        });
1285
1286        assert!((session.current_confidence() - 0.85).abs() < 1e-9);
1287    }
1288
1289    #[test]
1290    fn test_token_usage_add() {
1291        let mut total = TokenUsage {
1292            input_tokens: 100,
1293            output_tokens: 50,
1294            total_tokens: 150,
1295            cost_usd: 0.001,
1296        };
1297
1298        let step = TokenUsage {
1299            input_tokens: 200,
1300            output_tokens: 100,
1301            total_tokens: 300,
1302            cost_usd: 0.002,
1303        };
1304
1305        total.add(&step);
1306
1307        assert_eq!(total.input_tokens, 300);
1308        assert_eq!(total.output_tokens, 150);
1309        assert_eq!(total.total_tokens, 450);
1310        assert!((total.cost_usd - 0.003).abs() < 0.0001);
1311    }
1312
1313    #[test]
1314    fn test_decision_meets_threshold() {
1315        let decision = Decision {
1316            id: Uuid::new_v4(),
1317            session_id: Uuid::new_v4(),
1318            prompt: "test".into(),
1319            profile: Profile::Balanced,
1320            conclusion: "test conclusion".into(),
1321            confidence: 0.85,
1322            steps: vec![],
1323            total_tokens: TokenUsage::default(),
1324            total_duration_ms: 1000,
1325            memory_context: None,
1326            success: true,
1327            insights: vec![],
1328            caveats: vec![],
1329            timestamp: chrono::Utc::now(),
1330        };
1331
1332        assert!(decision.meets_threshold()); // 0.85 >= 0.80
1333
1334        let low_confidence = Decision {
1335            confidence: 0.75,
1336            ..decision.clone()
1337        };
1338        assert!(!low_confidence.meets_threshold()); // 0.75 < 0.80
1339    }
1340}