Skip to main content

codetether_agent/session/
mod.rs

1//! Session management
2//!
3//! Sessions track the conversation history and state for agent interactions.
4
5use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::provider::{Message, Usage};
8use crate::tool::ToolRegistry;
9use anyhow::Result;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::fs;
16use uuid::Uuid;
17
18#[cfg(feature = "functiongemma")]
19use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
20
21fn is_interactive_tool(tool_name: &str) -> bool {
22    matches!(tool_name, "question")
23}
24
25fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
26    // Keep Google as an explicit option, but don't default to it first because
27    // some environments expose API keys that are not valid for ChatCompletions.
28    let preferred = [
29        "zai",
30        "openai",
31        "github-copilot",
32        "anthropic",
33        "openrouter",
34        "novita",
35        "moonshotai",
36        "google",
37    ];
38    for name in preferred {
39        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
40            return Some(found);
41        }
42    }
43    providers.first().copied()
44}
45
46/// A conversation session
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct Session {
49    pub id: String,
50    pub title: Option<String>,
51    pub created_at: DateTime<Utc>,
52    pub updated_at: DateTime<Utc>,
53    pub messages: Vec<Message>,
54    pub tool_uses: Vec<ToolUse>,
55    pub usage: Usage,
56    pub agent: String,
57    pub metadata: SessionMetadata,
58}
59
60#[derive(Debug, Clone, Default, Serialize, Deserialize)]
61pub struct SessionMetadata {
62    pub directory: Option<PathBuf>,
63    pub model: Option<String>,
64    pub shared: bool,
65    pub share_url: Option<String>,
66}
67
68impl Session {
69    fn default_model_for_provider(provider: &str) -> String {
70        match provider {
71            "moonshotai" => "kimi-k2.5".to_string(),
72            "anthropic" => "claude-sonnet-4-20250514".to_string(),
73            "openai" => "gpt-4o".to_string(),
74            "google" => "gemini-2.5-pro".to_string(),
75            "zhipuai" | "zai" => "glm-5".to_string(),
76            // OpenRouter uses model IDs like "z-ai/glm-5".
77            "openrouter" => "z-ai/glm-5".to_string(),
78            "novita" => "qwen/qwen3-coder-next".to_string(),
79            "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
80            _ => "glm-5".to_string(),
81        }
82    }
83
84    /// Create a new session
85    pub async fn new() -> Result<Self> {
86        let id = Uuid::new_v4().to_string();
87        let now = Utc::now();
88
89        Ok(Self {
90            id,
91            title: None,
92            created_at: now,
93            updated_at: now,
94            messages: Vec::new(),
95            tool_uses: Vec::new(),
96            usage: Usage::default(),
97            agent: "build".to_string(),
98            metadata: SessionMetadata {
99                directory: Some(std::env::current_dir()?),
100                ..Default::default()
101            },
102        })
103    }
104
105    /// Load an existing session
106    pub async fn load(id: &str) -> Result<Self> {
107        let path = Self::session_path(id)?;
108        let content = fs::read_to_string(&path).await?;
109        let session: Session = serde_json::from_str(&content)?;
110        Ok(session)
111    }
112
113    /// Load the last session, optionally scoped to a workspace directory
114    ///
115    /// When `workspace` is Some, only considers sessions created in that directory.
116    /// When None, returns the most recent session globally (legacy behavior).
117    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
118        let sessions_dir = Self::sessions_dir()?;
119
120        if !sessions_dir.exists() {
121            anyhow::bail!("No sessions found");
122        }
123
124        let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
125        let mut read_dir = fs::read_dir(&sessions_dir).await?;
126        while let Some(entry) = read_dir.next_entry().await? {
127            entries.push(entry);
128        }
129
130        if entries.is_empty() {
131            anyhow::bail!("No sessions found");
132        }
133
134        // Sort by modification time (most recent first)
135        // Use std::fs::metadata since we can't await in sort_by_key
136        entries.sort_by_key(|e| {
137            std::cmp::Reverse(
138                std::fs::metadata(e.path())
139                    .ok()
140                    .and_then(|m| m.modified().ok())
141                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
142            )
143        });
144
145        let canonical_workspace =
146            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
147
148        for entry in &entries {
149            let content: String = fs::read_to_string(entry.path()).await?;
150            if let Ok(session) = serde_json::from_str::<Session>(&content) {
151                // If workspace scoping requested, filter by directory
152                if let Some(ref ws) = canonical_workspace {
153                    if let Some(ref dir) = session.metadata.directory {
154                        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
155                        if &canonical_dir == ws {
156                            return Ok(session);
157                        }
158                    }
159                    continue;
160                }
161                return Ok(session);
162            }
163        }
164
165        anyhow::bail!("No sessions found")
166    }
167
168    /// Load the last session (global, unscoped — legacy compatibility)
169    pub async fn last() -> Result<Self> {
170        Self::last_for_directory(None).await
171    }
172
173    /// Save the session to disk
174    pub async fn save(&self) -> Result<()> {
175        let path = Self::session_path(&self.id)?;
176
177        if let Some(parent) = path.parent() {
178            fs::create_dir_all(parent).await?;
179        }
180
181        let content = serde_json::to_string_pretty(self)?;
182        fs::write(&path, content).await?;
183
184        Ok(())
185    }
186
187    /// Add a message to the session
188    pub fn add_message(&mut self, message: Message) {
189        self.messages.push(message);
190        self.updated_at = Utc::now();
191    }
192
193    /// Execute a prompt and get the result
194    pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
195        use crate::provider::{
196            CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
197        };
198
199        // Load providers from Vault
200        let registry = ProviderRegistry::from_vault().await?;
201
202        let providers = registry.list();
203        if providers.is_empty() {
204            anyhow::bail!(
205                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
206            );
207        }
208
209        tracing::info!("Available providers: {:?}", providers);
210
211        // Parse model string (format: "provider/model", "provider", or just "model")
212        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
213            let (prov, model) = parse_model_string(model_str);
214            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
215            if prov.is_some() {
216                // Format: provider/model
217                (prov.map(|s| s.to_string()), model.to_string())
218            } else if providers.contains(&model) {
219                // Format: just provider name (e.g., "novita")
220                (Some(model.to_string()), String::new())
221            } else {
222                // Format: just model name
223                (None, model.to_string())
224            }
225        } else {
226            (None, String::new())
227        };
228
229        // Determine which provider to use with deterministic fallback ordering.
230        let selected_provider = provider_name
231            .as_deref()
232            .filter(|p| providers.contains(p))
233            .or_else(|| choose_default_provider(providers.as_slice()))
234            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
235
236        let provider = registry
237            .get(selected_provider)
238            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
239
240        // Add user message to session using add_message
241        self.add_message(Message {
242            role: Role::User,
243            content: vec![ContentPart::Text {
244                text: message.to_string(),
245            }],
246        });
247
248        // Generate title if this is the first user message and no title exists
249        if self.title.is_none() {
250            self.generate_title().await?;
251        }
252
253        // Determine model to use
254        let model = if !model_id.is_empty() {
255            model_id
256        } else {
257            Self::default_model_for_provider(selected_provider)
258        };
259
260        // Create tool registry with all available tools
261        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
262        let tool_definitions: Vec<_> = tool_registry
263            .definitions()
264            .into_iter()
265            .filter(|tool| !is_interactive_tool(&tool.name))
266            .collect();
267
268        // Some models behave best with temperature=1.0.
269        // - Kimi K2.x requires temperature=1.0
270        // - GLM (Z.AI) defaults to temperature 1.0 for coding workflows
271        // Use contains() to match both short aliases and provider-qualified IDs.
272        let temperature = if model.contains("kimi-k2") || model.contains("glm-") {
273            Some(1.0)
274        } else {
275            Some(0.7)
276        };
277
278        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
279        tracing::info!("Available tools: {}", tool_definitions.len());
280
281        // All current providers support native tool calling.  Hardcode to
282        // true so we skip the expensive list_models() API call on every message.
283        #[cfg(feature = "functiongemma")]
284        let model_supports_tools = true;
285
286        // Build system prompt with AGENTS.md
287        let cwd = self
288            .metadata
289            .directory
290            .clone()
291            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
292        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
293
294        // Run agentic loop with tool execution
295        let max_steps = 50;
296        let mut final_output = String::new();
297
298        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
299        #[cfg(feature = "functiongemma")]
300        let tool_router: Option<ToolCallRouter> = {
301            let cfg = ToolRouterConfig::from_env();
302            match ToolCallRouter::from_config(&cfg) {
303                Ok(r) => r,
304                Err(e) => {
305                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
306                    None
307                }
308            }
309        };
310
311        for step in 1..=max_steps {
312            tracing::info!(step = step, "Agent step starting");
313
314            // Build messages with system prompt first
315            let mut messages = vec![Message {
316                role: Role::System,
317                content: vec![ContentPart::Text {
318                    text: system_prompt.clone(),
319                }],
320            }];
321            messages.extend(self.messages.clone());
322
323            // Create completion request with tools
324            let request = CompletionRequest {
325                messages,
326                tools: tool_definitions.clone(),
327                model: model.clone(),
328                temperature,
329                top_p: None,
330                max_tokens: Some(8192),
331                stop: Vec::new(),
332            };
333
334            // Call the provider
335            let response = provider.complete(request).await?;
336
337            // Optionally route text-only responses through FunctionGemma to
338            // produce structured tool calls.  Skipped when the model natively
339            // supports tool calling (which all current providers do).
340            #[cfg(feature = "functiongemma")]
341            let response = if let Some(ref router) = tool_router {
342                router
343                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
344                    .await
345            } else {
346                response
347            };
348
349            // Record token usage
350            crate::telemetry::TOKEN_USAGE.record_model_usage(
351                &model,
352                response.usage.prompt_tokens as u64,
353                response.usage.completion_tokens as u64,
354            );
355
356            // Extract tool calls from response
357            let tool_calls: Vec<(String, String, serde_json::Value)> = response
358                .message
359                .content
360                .iter()
361                .filter_map(|part| {
362                    if let ContentPart::ToolCall {
363                        id,
364                        name,
365                        arguments,
366                    } = part
367                    {
368                        // Parse arguments JSON string into Value
369                        let args: serde_json::Value =
370                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
371                        Some((id.clone(), name.clone(), args))
372                    } else {
373                        None
374                    }
375                })
376                .collect();
377
378            // Collect text output
379            for part in &response.message.content {
380                if let ContentPart::Text { text } = part {
381                    if !text.is_empty() {
382                        final_output.push_str(text);
383                        final_output.push('\n');
384                    }
385                }
386            }
387
388            // If no tool calls, we're done
389            if tool_calls.is_empty() {
390                self.add_message(response.message.clone());
391                break;
392            }
393
394            // Add assistant message with tool calls
395            self.add_message(response.message.clone());
396
397            tracing::info!(
398                step = step,
399                num_tools = tool_calls.len(),
400                "Executing tool calls"
401            );
402
403            // Execute each tool call
404            for (tool_id, tool_name, tool_input) in tool_calls {
405                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
406
407                if is_interactive_tool(&tool_name) {
408                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
409                    self.add_message(Message {
410                        role: Role::Tool,
411                        content: vec![ContentPart::ToolResult {
412                            tool_call_id: tool_id,
413                            content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
414                        }],
415                    });
416                    continue;
417                }
418
419                // Get and execute the tool
420                let exec_start = std::time::Instant::now();
421                let content = if let Some(tool) = tool_registry.get(&tool_name) {
422                    match tool.execute(tool_input.clone()).await {
423                        Ok(result) => {
424                            let duration_ms = exec_start.elapsed().as_millis() as u64;
425                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
426                            if let Some(audit) = try_audit_log() {
427                                audit.log(
428                                    AuditCategory::ToolExecution,
429                                    format!("tool:{}", tool_name),
430                                    if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
431                                    None,
432                                    Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
433                                ).await;
434                            }
435                            result.output
436                        }
437                        Err(e) => {
438                            let duration_ms = exec_start.elapsed().as_millis() as u64;
439                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
440                            if let Some(audit) = try_audit_log() {
441                                audit.log(
442                                    AuditCategory::ToolExecution,
443                                    format!("tool:{}", tool_name),
444                                    AuditOutcome::Failure,
445                                    None,
446                                    Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
447                                ).await;
448                            }
449                            format!("Error: {}", e)
450                        }
451                    }
452                } else {
453                    tracing::warn!(tool = %tool_name, "Tool not found");
454                    if let Some(audit) = try_audit_log() {
455                        audit
456                            .log(
457                                AuditCategory::ToolExecution,
458                                format!("tool:{}", tool_name),
459                                AuditOutcome::Failure,
460                                None,
461                                Some(json!({ "error": "unknown_tool" })),
462                            )
463                            .await;
464                    }
465                    format!("Error: Unknown tool '{}'", tool_name)
466                };
467
468                // Add tool result message
469                self.add_message(Message {
470                    role: Role::Tool,
471                    content: vec![ContentPart::ToolResult {
472                        tool_call_id: tool_id,
473                        content,
474                    }],
475                });
476            }
477        }
478
479        // Save session after each prompt to persist messages
480        self.save().await?;
481
482        Ok(SessionResult {
483            text: final_output.trim().to_string(),
484            session_id: self.id.clone(),
485        })
486    }
487
488    /// Process a user message with real-time event streaming for UI updates.
489    /// Events are sent through the provided channel as tool calls execute.
490    ///
491    /// Accepts a pre-loaded `ProviderRegistry` to avoid re-fetching secrets
492    /// from Vault on every message (which was the primary TUI performance
493    /// bottleneck).
494    pub async fn prompt_with_events(
495        &mut self,
496        message: &str,
497        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
498        registry: std::sync::Arc<crate::provider::ProviderRegistry>,
499    ) -> Result<SessionResult> {
500        use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
501
502        let _ = event_tx.send(SessionEvent::Thinking).await;
503
504        let providers = registry.list();
505        if providers.is_empty() {
506            anyhow::bail!(
507                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
508            );
509        }
510        tracing::info!("Available providers: {:?}", providers);
511
512        // Parse model string (format: "provider/model", "provider", or just "model")
513        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
514            let (prov, model) = parse_model_string(model_str);
515            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
516            if prov.is_some() {
517                (prov.map(|s| s.to_string()), model.to_string())
518            } else if providers.contains(&model) {
519                (Some(model.to_string()), String::new())
520            } else {
521                (None, model.to_string())
522            }
523        } else {
524            (None, String::new())
525        };
526
527        // Determine which provider to use with deterministic fallback ordering.
528        let selected_provider = provider_name
529            .as_deref()
530            .filter(|p| providers.contains(p))
531            .or_else(|| choose_default_provider(providers.as_slice()))
532            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
533
534        let provider = registry
535            .get(selected_provider)
536            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
537
538        // Add user message
539        self.add_message(Message {
540            role: Role::User,
541            content: vec![ContentPart::Text {
542                text: message.to_string(),
543            }],
544        });
545
546        // Generate title if needed
547        if self.title.is_none() {
548            self.generate_title().await?;
549        }
550
551        // Determine model
552        let model = if !model_id.is_empty() {
553            model_id
554        } else {
555            Self::default_model_for_provider(selected_provider)
556        };
557
558        // Create tool registry
559        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
560        let tool_definitions: Vec<_> = tool_registry
561            .definitions()
562            .into_iter()
563            .filter(|tool| !is_interactive_tool(&tool.name))
564            .collect();
565
566        let temperature = if model.contains("kimi-k2") || model.contains("glm-") {
567            Some(1.0)
568        } else {
569            Some(0.7)
570        };
571
572        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
573        tracing::info!("Available tools: {}", tool_definitions.len());
574
575        // All current providers support native tool calling.  Hardcode to
576        // true so we skip the expensive list_models() API call on every message.
577        #[cfg(feature = "functiongemma")]
578        let model_supports_tools = true;
579
580        // Build system prompt
581        let cwd = std::env::var("PWD")
582            .map(std::path::PathBuf::from)
583            .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
584        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
585
586        let mut final_output = String::new();
587        let max_steps = 50;
588
589        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
590        #[cfg(feature = "functiongemma")]
591        let tool_router: Option<ToolCallRouter> = {
592            let cfg = ToolRouterConfig::from_env();
593            match ToolCallRouter::from_config(&cfg) {
594                Ok(r) => r,
595                Err(e) => {
596                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
597                    None
598                }
599            }
600        };
601
602        for step in 1..=max_steps {
603            tracing::info!(step = step, "Agent step starting");
604            let _ = event_tx.send(SessionEvent::Thinking).await;
605
606            // Build messages with system prompt first
607            let mut messages = vec![Message {
608                role: Role::System,
609                content: vec![ContentPart::Text {
610                    text: system_prompt.clone(),
611                }],
612            }];
613            messages.extend(self.messages.clone());
614
615            let request = CompletionRequest {
616                messages,
617                tools: tool_definitions.clone(),
618                model: model.clone(),
619                temperature,
620                top_p: None,
621                max_tokens: Some(8192),
622                stop: Vec::new(),
623            };
624
625            let llm_start = std::time::Instant::now();
626            let response = provider.complete(request).await?;
627            let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
628
629            // Optionally route text-only responses through FunctionGemma to
630            // produce structured tool calls.  Skipped for native tool-calling models.
631            #[cfg(feature = "functiongemma")]
632            let response = if let Some(ref router) = tool_router {
633                router
634                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
635                    .await
636            } else {
637                response
638            };
639
640            crate::telemetry::TOKEN_USAGE.record_model_usage(
641                &model,
642                response.usage.prompt_tokens as u64,
643                response.usage.completion_tokens as u64,
644            );
645
646            // Emit usage report for TUI display
647            let _ = event_tx
648                .send(SessionEvent::UsageReport {
649                    prompt_tokens: response.usage.prompt_tokens,
650                    completion_tokens: response.usage.completion_tokens,
651                    duration_ms: llm_duration_ms,
652                    model: model.clone(),
653                })
654                .await;
655
656            // Extract tool calls
657            let tool_calls: Vec<(String, String, serde_json::Value)> = response
658                .message
659                .content
660                .iter()
661                .filter_map(|part| {
662                    if let ContentPart::ToolCall {
663                        id,
664                        name,
665                        arguments,
666                    } = part
667                    {
668                        let args: serde_json::Value =
669                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
670                        Some((id.clone(), name.clone(), args))
671                    } else {
672                        None
673                    }
674                })
675                .collect();
676
677            // Collect text output for this step
678            // Collect thinking and text output
679            let mut thinking_text = String::new();
680            let mut step_text = String::new();
681            for part in &response.message.content {
682                match part {
683                    ContentPart::Thinking { text } => {
684                        if !text.is_empty() {
685                            thinking_text.push_str(text);
686                            thinking_text.push('\n');
687                        }
688                    }
689                    ContentPart::Text { text } => {
690                        if !text.is_empty() {
691                            step_text.push_str(text);
692                            step_text.push('\n');
693                        }
694                    }
695                    _ => {}
696                }
697            }
698
699            // Emit thinking output first
700            if !thinking_text.trim().is_empty() {
701                let _ = event_tx
702                    .send(SessionEvent::ThinkingComplete(
703                        thinking_text.trim().to_string(),
704                    ))
705                    .await;
706            }
707
708            // Emit this step's text BEFORE tool calls so it appears in correct
709            // chronological order in the TUI chat display.
710            if !step_text.trim().is_empty() {
711                let trimmed = step_text.trim().to_string();
712                let _ = event_tx
713                    .send(SessionEvent::TextChunk(trimmed.clone()))
714                    .await;
715                let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
716                final_output.push_str(&step_text);
717            }
718
719            if tool_calls.is_empty() {
720                self.add_message(response.message.clone());
721                break;
722            }
723
724            self.add_message(response.message.clone());
725
726            tracing::info!(
727                step = step,
728                num_tools = tool_calls.len(),
729                "Executing tool calls"
730            );
731
732            // Execute each tool call with events
733            for (tool_id, tool_name, tool_input) in tool_calls {
734                let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
735                let _ = event_tx
736                    .send(SessionEvent::ToolCallStart {
737                        name: tool_name.clone(),
738                        arguments: args_str,
739                    })
740                    .await;
741
742                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
743
744                if is_interactive_tool(&tool_name) {
745                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
746                    let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
747                    let _ = event_tx
748                        .send(SessionEvent::ToolCallComplete {
749                            name: tool_name.clone(),
750                            output: content.clone(),
751                            success: false,
752                        })
753                        .await;
754                    self.add_message(Message {
755                        role: Role::Tool,
756                        content: vec![ContentPart::ToolResult {
757                            tool_call_id: tool_id,
758                            content,
759                        }],
760                    });
761                    continue;
762                }
763
764                let exec_start = std::time::Instant::now();
765                let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
766                    match tool.execute(tool_input.clone()).await {
767                        Ok(result) => {
768                            let duration_ms = exec_start.elapsed().as_millis() as u64;
769                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
770                            if let Some(audit) = try_audit_log() {
771                                audit.log(
772                                    AuditCategory::ToolExecution,
773                                    format!("tool:{}", tool_name),
774                                    if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
775                                    None,
776                                    Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
777                                ).await;
778                            }
779                            (result.output, result.success)
780                        }
781                        Err(e) => {
782                            let duration_ms = exec_start.elapsed().as_millis() as u64;
783                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
784                            if let Some(audit) = try_audit_log() {
785                                audit.log(
786                                    AuditCategory::ToolExecution,
787                                    format!("tool:{}", tool_name),
788                                    AuditOutcome::Failure,
789                                    None,
790                                    Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
791                                ).await;
792                            }
793                            (format!("Error: {}", e), false)
794                        }
795                    }
796                } else {
797                    tracing::warn!(tool = %tool_name, "Tool not found");
798                    if let Some(audit) = try_audit_log() {
799                        audit
800                            .log(
801                                AuditCategory::ToolExecution,
802                                format!("tool:{}", tool_name),
803                                AuditOutcome::Failure,
804                                None,
805                                Some(json!({ "error": "unknown_tool" })),
806                            )
807                            .await;
808                    }
809                    (format!("Error: Unknown tool '{}'", tool_name), false)
810                };
811
812                let _ = event_tx
813                    .send(SessionEvent::ToolCallComplete {
814                        name: tool_name.clone(),
815                        output: content.clone(),
816                        success,
817                    })
818                    .await;
819
820                self.add_message(Message {
821                    role: Role::Tool,
822                    content: vec![ContentPart::ToolResult {
823                        tool_call_id: tool_id,
824                        content,
825                    }],
826                });
827            }
828        }
829
830        self.save().await?;
831
832        // Text was already sent per-step via TextComplete events.
833        // Send updated session state so the caller can sync back.
834        let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
835        let _ = event_tx.send(SessionEvent::Done).await;
836
837        Ok(SessionResult {
838            text: final_output.trim().to_string(),
839            session_id: self.id.clone(),
840        })
841    }
842
843    /// Generate a title for the session based on the first message
844    /// Only sets title if not already set (for initial title generation)
845    pub async fn generate_title(&mut self) -> Result<()> {
846        if self.title.is_some() {
847            return Ok(());
848        }
849
850        // Get first user message
851        let first_message = self
852            .messages
853            .iter()
854            .find(|m| m.role == crate::provider::Role::User);
855
856        if let Some(msg) = first_message {
857            let text: String = msg
858                .content
859                .iter()
860                .filter_map(|p| match p {
861                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
862                    _ => None,
863                })
864                .collect::<Vec<_>>()
865                .join(" ");
866
867            // Truncate to reasonable length
868            self.title = Some(truncate_with_ellipsis(&text, 47));
869        }
870
871        Ok(())
872    }
873
874    /// Regenerate the title based on the first message, even if already set
875    /// Use this for on-demand title updates or after context changes
876    pub async fn regenerate_title(&mut self) -> Result<()> {
877        // Get first user message
878        let first_message = self
879            .messages
880            .iter()
881            .find(|m| m.role == crate::provider::Role::User);
882
883        if let Some(msg) = first_message {
884            let text: String = msg
885                .content
886                .iter()
887                .filter_map(|p| match p {
888                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
889                    _ => None,
890                })
891                .collect::<Vec<_>>()
892                .join(" ");
893
894            // Truncate to reasonable length
895            self.title = Some(truncate_with_ellipsis(&text, 47));
896        }
897
898        Ok(())
899    }
900
901    /// Set a custom title for the session
902    pub fn set_title(&mut self, title: impl Into<String>) {
903        self.title = Some(title.into());
904        self.updated_at = Utc::now();
905    }
906
907    /// Clear the title, allowing it to be regenerated
908    pub fn clear_title(&mut self) {
909        self.title = None;
910        self.updated_at = Utc::now();
911    }
912
913    /// Handle context change - updates metadata and optionally regenerates title
914    /// Call this when the session context changes (e.g., directory change, model change)
915    pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
916        self.updated_at = Utc::now();
917
918        if regenerate_title {
919            self.regenerate_title().await?;
920        }
921
922        Ok(())
923    }
924
925    /// Import an OpenCode session into CodeTether
926    ///
927    /// Loads messages and parts from OpenCode storage and converts them
928    /// into a CodeTether session that can be resumed.
929    pub async fn from_opencode(
930        session_id: &str,
931        storage: &crate::opencode::OpenCodeStorage,
932    ) -> Result<Self> {
933        let oc_session = storage.load_session(session_id).await?;
934        let oc_messages = storage.load_messages(session_id).await?;
935
936        let mut messages_with_parts = Vec::new();
937        for msg in oc_messages {
938            let parts = storage.load_parts(&msg.id).await?;
939            messages_with_parts.push((msg, parts));
940        }
941
942        crate::opencode::convert::to_codetether_session(&oc_session, messages_with_parts).await
943    }
944
945    /// Try to load the last OpenCode session for a directory as a fallback
946    pub async fn last_opencode_for_directory(dir: &std::path::Path) -> Result<Self> {
947        let storage = crate::opencode::OpenCodeStorage::new()
948            .ok_or_else(|| anyhow::anyhow!("OpenCode storage directory not found"))?;
949
950        if !storage.exists() {
951            anyhow::bail!("OpenCode storage does not exist");
952        }
953
954        let oc_session = storage.last_session_for_directory(dir).await?;
955        Self::from_opencode(&oc_session.id, &storage).await
956    }
957
958    /// Delete a session by ID
959    pub async fn delete(id: &str) -> Result<()> {
960        let path = Self::session_path(id)?;
961        if path.exists() {
962            tokio::fs::remove_file(&path).await?;
963        }
964        Ok(())
965    }
966
967    /// Get the sessions directory
968    fn sessions_dir() -> Result<PathBuf> {
969        crate::config::Config::data_dir()
970            .map(|d| d.join("sessions"))
971            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
972    }
973
974    /// Get the path for a session file
975    fn session_path(id: &str) -> Result<PathBuf> {
976        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
977    }
978}
979
980/// Result from a session prompt
981#[derive(Debug, Clone, Serialize, Deserialize)]
982pub struct SessionResult {
983    pub text: String,
984    pub session_id: String,
985}
986
987/// Events emitted during session processing for real-time UI updates
988#[derive(Debug, Clone)]
989pub enum SessionEvent {
990    /// Agent is thinking/processing
991    Thinking,
992    /// Tool call started
993    ToolCallStart { name: String, arguments: String },
994    /// Tool call completed with result
995    ToolCallComplete {
996        name: String,
997        output: String,
998        success: bool,
999    },
1000    /// Partial text output (for streaming)
1001    TextChunk(String),
1002    /// Final text output
1003    TextComplete(String),
1004    /// Model thinking/reasoning output
1005    ThinkingComplete(String),
1006    /// Token usage for one LLM round-trip
1007    UsageReport {
1008        prompt_tokens: usize,
1009        completion_tokens: usize,
1010        duration_ms: u64,
1011        model: String,
1012    },
1013    /// Updated session state for caller to sync back
1014    SessionSync(Session),
1015    /// Processing complete
1016    Done,
1017    /// Error occurred
1018    Error(String),
1019}
1020
1021/// List all sessions
1022pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1023    let sessions_dir = crate::config::Config::data_dir()
1024        .map(|d| d.join("sessions"))
1025        .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1026
1027    if !sessions_dir.exists() {
1028        return Ok(Vec::new());
1029    }
1030
1031    let mut summaries = Vec::new();
1032    let mut entries = fs::read_dir(&sessions_dir).await?;
1033
1034    while let Some(entry) = entries.next_entry().await? {
1035        let path = entry.path();
1036        if path.extension().map(|e| e == "json").unwrap_or(false) {
1037            if let Ok(content) = fs::read_to_string(&path).await {
1038                if let Ok(session) = serde_json::from_str::<Session>(&content) {
1039                    summaries.push(SessionSummary {
1040                        id: session.id,
1041                        title: session.title,
1042                        created_at: session.created_at,
1043                        updated_at: session.updated_at,
1044                        message_count: session.messages.len(),
1045                        agent: session.agent,
1046                        directory: session.metadata.directory,
1047                    });
1048                }
1049            }
1050        }
1051    }
1052
1053    summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1054    Ok(summaries)
1055}
1056
1057/// List sessions scoped to a specific directory (workspace)
1058///
1059/// Only returns sessions whose `metadata.directory` matches the given path.
1060/// This prevents sessions from other workspaces "leaking" into the TUI.
1061pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1062    let all = list_sessions().await?;
1063    let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1064    Ok(all
1065        .into_iter()
1066        .filter(|s| {
1067            s.directory
1068                .as_ref()
1069                .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1070                .unwrap_or(false)
1071        })
1072        .collect())
1073}
1074
1075/// List sessions including OpenCode sessions for a directory.
1076///
1077/// Merges CodeTether sessions with any discovered OpenCode sessions,
1078/// sorted by most recently updated first. OpenCode sessions are
1079/// prefixed with `opencode_` in their ID.
1080pub async fn list_sessions_with_opencode(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1081    let mut sessions = list_sessions_for_directory(dir).await?;
1082
1083    // Also include OpenCode sessions if available
1084    if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1085        if storage.exists() {
1086            if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1087                for oc in oc_sessions {
1088                    // Skip if we already have a CodeTether import of this session
1089                    let import_id = format!("opencode_{}", oc.id);
1090                    if sessions.iter().any(|s| s.id == import_id) {
1091                        continue;
1092                    }
1093
1094                    sessions.push(SessionSummary {
1095                        id: import_id,
1096                        title: Some(format!("[opencode] {}", oc.title)),
1097                        created_at: oc.created_at,
1098                        updated_at: oc.updated_at,
1099                        message_count: oc.message_count,
1100                        agent: "build".to_string(),
1101                        directory: Some(PathBuf::from(&oc.directory)),
1102                    });
1103                }
1104            }
1105        }
1106    }
1107
1108    // Re-sort merged list by updated_at descending
1109    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1110    Ok(sessions)
1111}
1112
1113/// Summary of a session for listing
1114#[derive(Debug, Clone, Serialize, Deserialize)]
1115pub struct SessionSummary {
1116    pub id: String,
1117    pub title: Option<String>,
1118    pub created_at: DateTime<Utc>,
1119    pub updated_at: DateTime<Utc>,
1120    pub message_count: usize,
1121    pub agent: String,
1122    /// The working directory this session was created in
1123    #[serde(default)]
1124    pub directory: Option<PathBuf>,
1125}
1126
1127fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1128    if max_chars == 0 {
1129        return String::new();
1130    }
1131
1132    let mut chars = value.chars();
1133    let mut output = String::new();
1134    for _ in 0..max_chars {
1135        if let Some(ch) = chars.next() {
1136            output.push(ch);
1137        } else {
1138            return value.to_string();
1139        }
1140    }
1141
1142    if chars.next().is_some() {
1143        format!("{output}...")
1144    } else {
1145        output
1146    }
1147}
1148
1149// Async helper for Vec - kept for potential future use
1150#[allow(dead_code)]
1151use futures::StreamExt;
1152
1153#[allow(dead_code)]
1154trait AsyncCollect<T> {
1155    async fn collect(self) -> Vec<T>;
1156}
1157
1158#[allow(dead_code)]
1159impl<S, T> AsyncCollect<T> for S
1160where
1161    S: futures::Stream<Item = T> + Unpin,
1162{
1163    async fn collect(mut self) -> Vec<T> {
1164        let mut items = Vec::new();
1165        while let Some(item) = self.next().await {
1166            items.push(item);
1167        }
1168        items
1169    }
1170}