Skip to main content

codetether_agent/session/
mod.rs

1//! Session management
2//!
3//! Sessions track the conversation history and state for agent interactions.
4
5use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::tool::ToolRegistry;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use std::path::PathBuf;
16use std::sync::Arc;
17use tokio::fs;
18use uuid::Uuid;
19
20#[cfg(feature = "functiongemma")]
21use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
22
23fn is_interactive_tool(tool_name: &str) -> bool {
24    matches!(tool_name, "question")
25}
26
27fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
28    // Keep Google as an explicit option, but don't default to it first because
29    // some environments expose API keys that are not valid for ChatCompletions.
30    let preferred = [
31        "zai",
32        "openai",
33        "github-copilot",
34        "anthropic",
35        "minimax",
36        "openrouter",
37        "novita",
38        "moonshotai",
39        "google",
40    ];
41    for name in preferred {
42        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
43            return Some(found);
44        }
45    }
46    providers.first().copied()
47}
48
49fn prefers_temperature_one(model: &str) -> bool {
50    let normalized = model.to_ascii_lowercase();
51    normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
52}
53
54/// A conversation session
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct Session {
57    pub id: String,
58    pub title: Option<String>,
59    pub created_at: DateTime<Utc>,
60    pub updated_at: DateTime<Utc>,
61    pub messages: Vec<Message>,
62    pub tool_uses: Vec<ToolUse>,
63    pub usage: Usage,
64    pub agent: String,
65    pub metadata: SessionMetadata,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SessionMetadata {
70    pub directory: Option<PathBuf>,
71    pub model: Option<String>,
72    pub shared: bool,
73    pub share_url: Option<String>,
74}
75
76impl Session {
77    fn default_model_for_provider(provider: &str) -> String {
78        match provider {
79            "moonshotai" => "kimi-k2.5".to_string(),
80            "anthropic" => "claude-sonnet-4-20250514".to_string(),
81            "minimax" => "MiniMax-M2.5".to_string(),
82            "openai" => "gpt-4o".to_string(),
83            "google" => "gemini-2.5-pro".to_string(),
84            "zhipuai" | "zai" => "glm-5".to_string(),
85            // OpenRouter uses model IDs like "z-ai/glm-5".
86            "openrouter" => "z-ai/glm-5".to_string(),
87            "novita" => "qwen/qwen3-coder-next".to_string(),
88            "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
89            _ => "glm-5".to_string(),
90        }
91    }
92
93    /// Create a new session
94    pub async fn new() -> Result<Self> {
95        let id = Uuid::new_v4().to_string();
96        let now = Utc::now();
97
98        Ok(Self {
99            id,
100            title: None,
101            created_at: now,
102            updated_at: now,
103            messages: Vec::new(),
104            tool_uses: Vec::new(),
105            usage: Usage::default(),
106            agent: "build".to_string(),
107            metadata: SessionMetadata {
108                directory: Some(std::env::current_dir()?),
109                ..Default::default()
110            },
111        })
112    }
113
114    /// Load an existing session
115    pub async fn load(id: &str) -> Result<Self> {
116        let path = Self::session_path(id)?;
117        let content = fs::read_to_string(&path).await?;
118        let session: Session = serde_json::from_str(&content)?;
119        Ok(session)
120    }
121
122    /// Load the last session, optionally scoped to a workspace directory
123    ///
124    /// When `workspace` is Some, only considers sessions created in that directory.
125    /// When None, returns the most recent session globally (legacy behavior).
126    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
127        let sessions_dir = Self::sessions_dir()?;
128
129        if !sessions_dir.exists() {
130            anyhow::bail!("No sessions found");
131        }
132
133        let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
134        let mut read_dir = fs::read_dir(&sessions_dir).await?;
135        while let Some(entry) = read_dir.next_entry().await? {
136            entries.push(entry);
137        }
138
139        if entries.is_empty() {
140            anyhow::bail!("No sessions found");
141        }
142
143        // Sort by modification time (most recent first)
144        // Use std::fs::metadata since we can't await in sort_by_key
145        entries.sort_by_key(|e| {
146            std::cmp::Reverse(
147                std::fs::metadata(e.path())
148                    .ok()
149                    .and_then(|m| m.modified().ok())
150                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
151            )
152        });
153
154        let canonical_workspace =
155            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
156
157        for entry in &entries {
158            let content: String = fs::read_to_string(entry.path()).await?;
159            if let Ok(session) = serde_json::from_str::<Session>(&content) {
160                // If workspace scoping requested, filter by directory
161                if let Some(ref ws) = canonical_workspace {
162                    if let Some(ref dir) = session.metadata.directory {
163                        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
164                        if &canonical_dir == ws {
165                            return Ok(session);
166                        }
167                    }
168                    continue;
169                }
170                return Ok(session);
171            }
172        }
173
174        anyhow::bail!("No sessions found")
175    }
176
177    /// Load the last session (global, unscoped — legacy compatibility)
178    pub async fn last() -> Result<Self> {
179        Self::last_for_directory(None).await
180    }
181
182    /// Save the session to disk
183    pub async fn save(&self) -> Result<()> {
184        let path = Self::session_path(&self.id)?;
185
186        if let Some(parent) = path.parent() {
187            fs::create_dir_all(parent).await?;
188        }
189
190        let content = serde_json::to_string_pretty(self)?;
191        fs::write(&path, content).await?;
192
193        Ok(())
194    }
195
196    /// Add a message to the session
197    pub fn add_message(&mut self, message: Message) {
198        self.messages.push(message);
199        self.updated_at = Utc::now();
200    }
201
202    /// Execute a prompt and get the result
203    pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
204        use crate::provider::{
205            CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
206        };
207
208        // Load providers from Vault
209        let registry = ProviderRegistry::from_vault().await?;
210
211        let providers = registry.list();
212        if providers.is_empty() {
213            anyhow::bail!(
214                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
215            );
216        }
217
218        tracing::info!("Available providers: {:?}", providers);
219
220        // Parse model string (format: "provider/model", "provider", or just "model")
221        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
222            let (prov, model) = parse_model_string(model_str);
223            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
224            if prov.is_some() {
225                // Format: provider/model
226                (prov.map(|s| s.to_string()), model.to_string())
227            } else if providers.contains(&model) {
228                // Format: just provider name (e.g., "novita")
229                (Some(model.to_string()), String::new())
230            } else {
231                // Format: just model name
232                (None, model.to_string())
233            }
234        } else {
235            (None, String::new())
236        };
237
238        // Determine which provider to use with deterministic fallback ordering.
239        let selected_provider = provider_name
240            .as_deref()
241            .filter(|p| providers.contains(p))
242            .or_else(|| choose_default_provider(providers.as_slice()))
243            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
244
245        let provider = registry
246            .get(selected_provider)
247            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
248
249        // Add user message to session using add_message
250        self.add_message(Message {
251            role: Role::User,
252            content: vec![ContentPart::Text {
253                text: message.to_string(),
254            }],
255        });
256
257        // Generate title if this is the first user message and no title exists
258        if self.title.is_none() {
259            self.generate_title().await?;
260        }
261
262        // Determine model to use
263        let model = if !model_id.is_empty() {
264            model_id
265        } else {
266            Self::default_model_for_provider(selected_provider)
267        };
268
269        // Create tool registry with all available tools
270        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
271        let tool_definitions: Vec<_> = tool_registry
272            .definitions()
273            .into_iter()
274            .filter(|tool| !is_interactive_tool(&tool.name))
275            .collect();
276
277        // Some models behave best with temperature=1.0.
278        // - Kimi K2.x requires temperature=1.0
279        // - GLM (Z.AI) defaults to temperature 1.0 for coding workflows
280        // Use contains() to match both short aliases and provider-qualified IDs.
281        let temperature = if prefers_temperature_one(&model) {
282            Some(1.0)
283        } else {
284            Some(0.7)
285        };
286
287        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
288        tracing::info!("Available tools: {}", tool_definitions.len());
289
290        // All current providers support native tool calling.  Hardcode to
291        // true so we skip the expensive list_models() API call on every message.
292        #[cfg(feature = "functiongemma")]
293        let model_supports_tools = true;
294
295        // Build system prompt with AGENTS.md
296        let cwd = self
297            .metadata
298            .directory
299            .clone()
300            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
301        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
302
303        // Run agentic loop with tool execution
304        let max_steps = 50;
305        let mut final_output = String::new();
306
307        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
308        #[cfg(feature = "functiongemma")]
309        let tool_router: Option<ToolCallRouter> = {
310            let cfg = ToolRouterConfig::from_env();
311            match ToolCallRouter::from_config(&cfg) {
312                Ok(r) => r,
313                Err(e) => {
314                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
315                    None
316                }
317            }
318        };
319
320        for step in 1..=max_steps {
321            tracing::info!(step = step, "Agent step starting");
322
323            // Build messages with system prompt first
324            let mut messages = vec![Message {
325                role: Role::System,
326                content: vec![ContentPart::Text {
327                    text: system_prompt.clone(),
328                }],
329            }];
330            messages.extend(self.messages.clone());
331
332            // Create completion request with tools
333            let request = CompletionRequest {
334                messages,
335                tools: tool_definitions.clone(),
336                model: model.clone(),
337                temperature,
338                top_p: None,
339                max_tokens: Some(8192),
340                stop: Vec::new(),
341            };
342
343            // Call the provider
344            let response = provider.complete(request).await?;
345
346            // Optionally route text-only responses through FunctionGemma to
347            // produce structured tool calls.  Skipped when the model natively
348            // supports tool calling (which all current providers do).
349            #[cfg(feature = "functiongemma")]
350            let response = if let Some(ref router) = tool_router {
351                router
352                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
353                    .await
354            } else {
355                response
356            };
357
358            // Record token usage
359            crate::telemetry::TOKEN_USAGE.record_model_usage(
360                &model,
361                response.usage.prompt_tokens as u64,
362                response.usage.completion_tokens as u64,
363            );
364
365            // Extract tool calls from response
366            let tool_calls: Vec<(String, String, serde_json::Value)> = response
367                .message
368                .content
369                .iter()
370                .filter_map(|part| {
371                    if let ContentPart::ToolCall {
372                        id,
373                        name,
374                        arguments,
375                    } = part
376                    {
377                        // Parse arguments JSON string into Value
378                        let args: serde_json::Value =
379                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
380                        Some((id.clone(), name.clone(), args))
381                    } else {
382                        None
383                    }
384                })
385                .collect();
386
387            // Collect text output
388            for part in &response.message.content {
389                if let ContentPart::Text { text } = part {
390                    if !text.is_empty() {
391                        final_output.push_str(text);
392                        final_output.push('\n');
393                    }
394                }
395            }
396
397            // If no tool calls, we're done
398            if tool_calls.is_empty() {
399                self.add_message(response.message.clone());
400                break;
401            }
402
403            // Add assistant message with tool calls
404            self.add_message(response.message.clone());
405
406            tracing::info!(
407                step = step,
408                num_tools = tool_calls.len(),
409                "Executing tool calls"
410            );
411
412            // Execute each tool call
413            for (tool_id, tool_name, tool_input) in tool_calls {
414                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
415
416                if is_interactive_tool(&tool_name) {
417                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
418                    self.add_message(Message {
419                        role: Role::Tool,
420                        content: vec![ContentPart::ToolResult {
421                            tool_call_id: tool_id,
422                            content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
423                        }],
424                    });
425                    continue;
426                }
427
428                // Get and execute the tool
429                let exec_start = std::time::Instant::now();
430                let content = if let Some(tool) = tool_registry.get(&tool_name) {
431                    match tool.execute(tool_input.clone()).await {
432                        Ok(result) => {
433                            let duration_ms = exec_start.elapsed().as_millis() as u64;
434                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
435                            if let Some(audit) = try_audit_log() {
436                                audit.log(
437                                    AuditCategory::ToolExecution,
438                                    format!("tool:{}", tool_name),
439                                    if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
440                                    None,
441                                    Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
442                                ).await;
443                            }
444                            result.output
445                        }
446                        Err(e) => {
447                            let duration_ms = exec_start.elapsed().as_millis() as u64;
448                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
449                            if let Some(audit) = try_audit_log() {
450                                audit.log(
451                                    AuditCategory::ToolExecution,
452                                    format!("tool:{}", tool_name),
453                                    AuditOutcome::Failure,
454                                    None,
455                                    Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
456                                ).await;
457                            }
458                            format!("Error: {}", e)
459                        }
460                    }
461                } else {
462                    tracing::warn!(tool = %tool_name, "Tool not found");
463                    if let Some(audit) = try_audit_log() {
464                        audit
465                            .log(
466                                AuditCategory::ToolExecution,
467                                format!("tool:{}", tool_name),
468                                AuditOutcome::Failure,
469                                None,
470                                Some(json!({ "error": "unknown_tool" })),
471                            )
472                            .await;
473                    }
474                    format!("Error: Unknown tool '{}'", tool_name)
475                };
476
477                // Calculate duration for event stream
478                let duration_ms = exec_start.elapsed().as_millis() as u64;
479                let success = !content.starts_with("Error:");
480
481                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
482                if let Some(base_dir) = Self::event_stream_path() {
483                    let workspace = std::env::var("PWD")
484                        .map(PathBuf::from)
485                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
486                    let event = ChatEvent::tool_result(
487                        workspace,
488                        self.id.clone(),
489                        &tool_name,
490                        success,
491                        duration_ms,
492                        &content,
493                        self.messages.len() as u64,
494                    );
495                    let event_json = event.to_json();
496                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
497                    let seq = self.messages.len() as u64;
498                    let filename = format!(
499                        "{}-chat-events-{:020}-{:020}.jsonl",
500                        timestamp,
501                        seq * 10000,
502                        (seq + 1) * 10000
503                    );
504                    let event_path = base_dir.join(&self.id).join(filename);
505
506                    let event_path_clone = event_path;
507                    tokio::spawn(async move {
508                        if let Some(parent) = event_path_clone.parent() {
509                            let _ = tokio::fs::create_dir_all(parent).await;
510                        }
511                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
512                            .create(true)
513                            .append(true)
514                            .open(&event_path_clone)
515                            .await
516                        {
517                            use tokio::io::AsyncWriteExt;
518                            let _ = file.write_all(event_json.as_bytes()).await;
519                            let _ = file.write_all(b"\n").await;
520                        }
521                    });
522                }
523
524                // Add tool result message
525                self.add_message(Message {
526                    role: Role::Tool,
527                    content: vec![ContentPart::ToolResult {
528                        tool_call_id: tool_id,
529                        content,
530                    }],
531                });
532            }
533        }
534
535        // Save session after each prompt to persist messages
536        self.save().await?;
537
538        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
539        self.archive_event_stream_to_s3().await;
540
541        Ok(SessionResult {
542            text: final_output.trim().to_string(),
543            session_id: self.id.clone(),
544        })
545    }
546
547    /// Archive event stream files to S3/R2 for immutable compliance logging
548    async fn archive_event_stream_to_s3(&self) {
549        // Check if S3 is configured
550        if !S3Sink::is_configured() {
551            return;
552        }
553
554        let Some(base_dir) = Self::event_stream_path() else {
555            return;
556        };
557
558        let session_event_dir = base_dir.join(&self.id);
559        if !session_event_dir.exists() {
560            return;
561        }
562
563        // Try to create S3 sink
564        let Ok(sink) = S3Sink::from_env().await else {
565            tracing::warn!("Failed to create S3 sink for archival");
566            return;
567        };
568
569        // Upload all event files in the session directory
570        let session_id = self.id.clone();
571        tokio::spawn(async move {
572            if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
573                while let Ok(Some(entry)) = entries.next_entry().await {
574                    let path = entry.path();
575                    if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
576                        match sink.upload_file(&path, &session_id).await {
577                            Ok(url) => {
578                                tracing::info!(url = %url, "Archived event stream to S3/R2");
579                            }
580                            Err(e) => {
581                                tracing::warn!(error = %e, "Failed to archive event file to S3");
582                            }
583                        }
584                    }
585                }
586            }
587        });
588    }
589
590    /// Process a user message with real-time event streaming for UI updates.
591    /// Events are sent through the provided channel as tool calls execute.
592    ///
593    /// Accepts a pre-loaded `ProviderRegistry` to avoid re-fetching secrets
594    /// from Vault on every message (which was the primary TUI performance
595    /// bottleneck).
596    pub async fn prompt_with_events(
597        &mut self,
598        message: &str,
599        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
600        registry: std::sync::Arc<crate::provider::ProviderRegistry>,
601    ) -> Result<SessionResult> {
602        use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
603
604        let _ = event_tx.send(SessionEvent::Thinking).await;
605
606        let providers = registry.list();
607        if providers.is_empty() {
608            anyhow::bail!(
609                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
610            );
611        }
612        tracing::info!("Available providers: {:?}", providers);
613
614        // Parse model string (format: "provider/model", "provider", or just "model")
615        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
616            let (prov, model) = parse_model_string(model_str);
617            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
618            if prov.is_some() {
619                (prov.map(|s| s.to_string()), model.to_string())
620            } else if providers.contains(&model) {
621                (Some(model.to_string()), String::new())
622            } else {
623                (None, model.to_string())
624            }
625        } else {
626            (None, String::new())
627        };
628
629        // Determine which provider to use with deterministic fallback ordering.
630        let selected_provider = provider_name
631            .as_deref()
632            .filter(|p| providers.contains(p))
633            .or_else(|| choose_default_provider(providers.as_slice()))
634            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
635
636        let provider = registry
637            .get(selected_provider)
638            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
639
640        // Add user message
641        self.add_message(Message {
642            role: Role::User,
643            content: vec![ContentPart::Text {
644                text: message.to_string(),
645            }],
646        });
647
648        // Generate title if needed
649        if self.title.is_none() {
650            self.generate_title().await?;
651        }
652
653        // Determine model
654        let model = if !model_id.is_empty() {
655            model_id
656        } else {
657            Self::default_model_for_provider(selected_provider)
658        };
659
660        // Create tool registry
661        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
662        let tool_definitions: Vec<_> = tool_registry
663            .definitions()
664            .into_iter()
665            .filter(|tool| !is_interactive_tool(&tool.name))
666            .collect();
667
668        let temperature = if prefers_temperature_one(&model) {
669            Some(1.0)
670        } else {
671            Some(0.7)
672        };
673
674        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
675        tracing::info!("Available tools: {}", tool_definitions.len());
676
677        // All current providers support native tool calling.  Hardcode to
678        // true so we skip the expensive list_models() API call on every message.
679        #[cfg(feature = "functiongemma")]
680        let model_supports_tools = true;
681
682        // Build system prompt
683        let cwd = std::env::var("PWD")
684            .map(std::path::PathBuf::from)
685            .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
686        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
687
688        let mut final_output = String::new();
689        let max_steps = 50;
690
691        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
692        #[cfg(feature = "functiongemma")]
693        let tool_router: Option<ToolCallRouter> = {
694            let cfg = ToolRouterConfig::from_env();
695            match ToolCallRouter::from_config(&cfg) {
696                Ok(r) => r,
697                Err(e) => {
698                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
699                    None
700                }
701            }
702        };
703
704        for step in 1..=max_steps {
705            tracing::info!(step = step, "Agent step starting");
706            let _ = event_tx.send(SessionEvent::Thinking).await;
707
708            // Build messages with system prompt first
709            let mut messages = vec![Message {
710                role: Role::System,
711                content: vec![ContentPart::Text {
712                    text: system_prompt.clone(),
713                }],
714            }];
715            messages.extend(self.messages.clone());
716
717            let request = CompletionRequest {
718                messages,
719                tools: tool_definitions.clone(),
720                model: model.clone(),
721                temperature,
722                top_p: None,
723                max_tokens: Some(8192),
724                stop: Vec::new(),
725            };
726
727            let llm_start = std::time::Instant::now();
728            let response = provider.complete(request).await?;
729            let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
730
731            // Optionally route text-only responses through FunctionGemma to
732            // produce structured tool calls.  Skipped for native tool-calling models.
733            #[cfg(feature = "functiongemma")]
734            let response = if let Some(ref router) = tool_router {
735                router
736                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
737                    .await
738            } else {
739                response
740            };
741
742            crate::telemetry::TOKEN_USAGE.record_model_usage(
743                &model,
744                response.usage.prompt_tokens as u64,
745                response.usage.completion_tokens as u64,
746            );
747
748            // Emit usage report for TUI display
749            let _ = event_tx
750                .send(SessionEvent::UsageReport {
751                    prompt_tokens: response.usage.prompt_tokens,
752                    completion_tokens: response.usage.completion_tokens,
753                    duration_ms: llm_duration_ms,
754                    model: model.clone(),
755                })
756                .await;
757
758            // Extract tool calls
759            let tool_calls: Vec<(String, String, serde_json::Value)> = response
760                .message
761                .content
762                .iter()
763                .filter_map(|part| {
764                    if let ContentPart::ToolCall {
765                        id,
766                        name,
767                        arguments,
768                    } = part
769                    {
770                        let args: serde_json::Value =
771                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
772                        Some((id.clone(), name.clone(), args))
773                    } else {
774                        None
775                    }
776                })
777                .collect();
778
779            // Collect text output for this step
780            // Collect thinking and text output
781            let mut thinking_text = String::new();
782            let mut step_text = String::new();
783            for part in &response.message.content {
784                match part {
785                    ContentPart::Thinking { text } => {
786                        if !text.is_empty() {
787                            thinking_text.push_str(text);
788                            thinking_text.push('\n');
789                        }
790                    }
791                    ContentPart::Text { text } => {
792                        if !text.is_empty() {
793                            step_text.push_str(text);
794                            step_text.push('\n');
795                        }
796                    }
797                    _ => {}
798                }
799            }
800
801            // Emit thinking output first
802            if !thinking_text.trim().is_empty() {
803                let _ = event_tx
804                    .send(SessionEvent::ThinkingComplete(
805                        thinking_text.trim().to_string(),
806                    ))
807                    .await;
808            }
809
810            // Emit this step's text BEFORE tool calls so it appears in correct
811            // chronological order in the TUI chat display.
812            if !step_text.trim().is_empty() {
813                let trimmed = step_text.trim().to_string();
814                let _ = event_tx
815                    .send(SessionEvent::TextChunk(trimmed.clone()))
816                    .await;
817                let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
818                final_output.push_str(&step_text);
819            }
820
821            if tool_calls.is_empty() {
822                self.add_message(response.message.clone());
823                break;
824            }
825
826            self.add_message(response.message.clone());
827
828            tracing::info!(
829                step = step,
830                num_tools = tool_calls.len(),
831                "Executing tool calls"
832            );
833
834            // Execute each tool call with events
835            for (tool_id, tool_name, tool_input) in tool_calls {
836                let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
837                let _ = event_tx
838                    .send(SessionEvent::ToolCallStart {
839                        name: tool_name.clone(),
840                        arguments: args_str,
841                    })
842                    .await;
843
844                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
845
846                if is_interactive_tool(&tool_name) {
847                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
848                    let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
849                    let _ = event_tx
850                        .send(SessionEvent::ToolCallComplete {
851                            name: tool_name.clone(),
852                            output: content.clone(),
853                            success: false,
854                        })
855                        .await;
856                    self.add_message(Message {
857                        role: Role::Tool,
858                        content: vec![ContentPart::ToolResult {
859                            tool_call_id: tool_id,
860                            content,
861                        }],
862                    });
863                    continue;
864                }
865
866                let exec_start = std::time::Instant::now();
867                let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
868                    match tool.execute(tool_input.clone()).await {
869                        Ok(result) => {
870                            let duration_ms = exec_start.elapsed().as_millis() as u64;
871                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
872                            if let Some(audit) = try_audit_log() {
873                                audit.log(
874                                    AuditCategory::ToolExecution,
875                                    format!("tool:{}", tool_name),
876                                    if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
877                                    None,
878                                    Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
879                                ).await;
880                            }
881                            (result.output, result.success)
882                        }
883                        Err(e) => {
884                            let duration_ms = exec_start.elapsed().as_millis() as u64;
885                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
886                            if let Some(audit) = try_audit_log() {
887                                audit.log(
888                                    AuditCategory::ToolExecution,
889                                    format!("tool:{}", tool_name),
890                                    AuditOutcome::Failure,
891                                    None,
892                                    Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
893                                ).await;
894                            }
895                            (format!("Error: {}", e), false)
896                        }
897                    }
898                } else {
899                    tracing::warn!(tool = %tool_name, "Tool not found");
900                    if let Some(audit) = try_audit_log() {
901                        audit
902                            .log(
903                                AuditCategory::ToolExecution,
904                                format!("tool:{}", tool_name),
905                                AuditOutcome::Failure,
906                                None,
907                                Some(json!({ "error": "unknown_tool" })),
908                            )
909                            .await;
910                    }
911                    (format!("Error: Unknown tool '{}'", tool_name), false)
912                };
913
914                // Calculate total duration from exec_start (captured from line 772)
915                let duration_ms = exec_start.elapsed().as_millis() as u64;
916
917                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
918                // This creates the structured JSONL record with byte-range offsets
919                // File format: {timestamp}-chat-events-{start_byte}-{end_byte}.jsonl
920                if let Some(base_dir) = Self::event_stream_path() {
921                    let workspace = std::env::var("PWD")
922                        .map(PathBuf::from)
923                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
924                    let event = ChatEvent::tool_result(
925                        workspace,
926                        self.id.clone(),
927                        &tool_name,
928                        success,
929                        duration_ms,
930                        &content,
931                        self.messages.len() as u64,
932                    );
933                    let event_json = event.to_json();
934                    let event_size = event_json.len() as u64 + 1; // +1 for newline
935
936                    // Generate filename with byte-range offsets for random access replay
937                    // Format: {timestamp}-chat-events-{start_offset}-{end_offset}.jsonl
938                    // We use a session-scoped counter stored in metadata for byte tracking
939                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
940                    let seq = self.messages.len() as u64;
941                    let filename = format!(
942                        "{}-chat-events-{:020}-{:020}.jsonl",
943                        timestamp,
944                        seq * 10000,       // approximated start offset
945                        (seq + 1) * 10000  // approximated end offset
946                    );
947                    let event_path = base_dir.join(&self.id).join(filename);
948
949                    // Fire-and-forget: don't block tool execution on event logging
950                    let event_path_clone = event_path;
951                    tokio::spawn(async move {
952                        if let Some(parent) = event_path_clone.parent() {
953                            let _ = tokio::fs::create_dir_all(parent).await;
954                        }
955                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
956                            .create(true)
957                            .append(true)
958                            .open(&event_path_clone)
959                            .await
960                        {
961                            use tokio::io::AsyncWriteExt;
962                            let _ = file.write_all(event_json.as_bytes()).await;
963                            let _ = file.write_all(b"\n").await;
964                            tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
965                        }
966                    });
967                }
968
969                let _ = event_tx
970                    .send(SessionEvent::ToolCallComplete {
971                        name: tool_name.clone(),
972                        output: content.clone(),
973                        success,
974                    })
975                    .await;
976
977                self.add_message(Message {
978                    role: Role::Tool,
979                    content: vec![ContentPart::ToolResult {
980                        tool_call_id: tool_id,
981                        content,
982                    }],
983                });
984            }
985        }
986
987        self.save().await?;
988
989        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
990        self.archive_event_stream_to_s3().await;
991
992        // Text was already sent per-step via TextComplete events.
993        // Send updated session state so the caller can sync back.
994        let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
995        let _ = event_tx.send(SessionEvent::Done).await;
996
997        Ok(SessionResult {
998            text: final_output.trim().to_string(),
999            session_id: self.id.clone(),
1000        })
1001    }
1002
1003    /// Generate a title for the session based on the first message
1004    /// Only sets title if not already set (for initial title generation)
1005    pub async fn generate_title(&mut self) -> Result<()> {
1006        if self.title.is_some() {
1007            return Ok(());
1008        }
1009
1010        // Get first user message
1011        let first_message = self
1012            .messages
1013            .iter()
1014            .find(|m| m.role == crate::provider::Role::User);
1015
1016        if let Some(msg) = first_message {
1017            let text: String = msg
1018                .content
1019                .iter()
1020                .filter_map(|p| match p {
1021                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1022                    _ => None,
1023                })
1024                .collect::<Vec<_>>()
1025                .join(" ");
1026
1027            // Truncate to reasonable length
1028            self.title = Some(truncate_with_ellipsis(&text, 47));
1029        }
1030
1031        Ok(())
1032    }
1033
1034    /// Regenerate the title based on the first message, even if already set
1035    /// Use this for on-demand title updates or after context changes
1036    pub async fn regenerate_title(&mut self) -> Result<()> {
1037        // Get first user message
1038        let first_message = self
1039            .messages
1040            .iter()
1041            .find(|m| m.role == crate::provider::Role::User);
1042
1043        if let Some(msg) = first_message {
1044            let text: String = msg
1045                .content
1046                .iter()
1047                .filter_map(|p| match p {
1048                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1049                    _ => None,
1050                })
1051                .collect::<Vec<_>>()
1052                .join(" ");
1053
1054            // Truncate to reasonable length
1055            self.title = Some(truncate_with_ellipsis(&text, 47));
1056        }
1057
1058        Ok(())
1059    }
1060
1061    /// Set a custom title for the session
1062    pub fn set_title(&mut self, title: impl Into<String>) {
1063        self.title = Some(title.into());
1064        self.updated_at = Utc::now();
1065    }
1066
1067    /// Clear the title, allowing it to be regenerated
1068    pub fn clear_title(&mut self) {
1069        self.title = None;
1070        self.updated_at = Utc::now();
1071    }
1072
1073    /// Handle context change - updates metadata and optionally regenerates title
1074    /// Call this when the session context changes (e.g., directory change, model change)
1075    pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1076        self.updated_at = Utc::now();
1077
1078        if regenerate_title {
1079            self.regenerate_title().await?;
1080        }
1081
1082        Ok(())
1083    }
1084
1085    /// Import an OpenCode session into CodeTether
1086    ///
1087    /// Loads messages and parts from OpenCode storage and converts them
1088    /// into a CodeTether session that can be resumed.
1089    pub async fn from_opencode(
1090        session_id: &str,
1091        storage: &crate::opencode::OpenCodeStorage,
1092    ) -> Result<Self> {
1093        let oc_session = storage.load_session(session_id).await?;
1094        let oc_messages = storage.load_messages(session_id).await?;
1095
1096        let mut messages_with_parts = Vec::new();
1097        for msg in oc_messages {
1098            let parts = storage.load_parts(&msg.id).await?;
1099            messages_with_parts.push((msg, parts));
1100        }
1101
1102        crate::opencode::convert::to_codetether_session(&oc_session, messages_with_parts).await
1103    }
1104
1105    /// Try to load the last OpenCode session for a directory as a fallback
1106    pub async fn last_opencode_for_directory(dir: &std::path::Path) -> Result<Self> {
1107        let storage = crate::opencode::OpenCodeStorage::new()
1108            .ok_or_else(|| anyhow::anyhow!("OpenCode storage directory not found"))?;
1109
1110        if !storage.exists() {
1111            anyhow::bail!("OpenCode storage does not exist");
1112        }
1113
1114        let oc_session = storage.last_session_for_directory(dir).await?;
1115        Self::from_opencode(&oc_session.id, &storage).await
1116    }
1117
1118    /// Delete a session by ID
1119    pub async fn delete(id: &str) -> Result<()> {
1120        let path = Self::session_path(id)?;
1121        if path.exists() {
1122            tokio::fs::remove_file(&path).await?;
1123        }
1124        Ok(())
1125    }
1126
1127    /// Get the sessions directory
1128    fn sessions_dir() -> Result<PathBuf> {
1129        crate::config::Config::data_dir()
1130            .map(|d| d.join("sessions"))
1131            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1132    }
1133
1134    /// Get the path for a session file
1135    fn session_path(id: &str) -> Result<PathBuf> {
1136        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1137    }
1138
1139    /// Get the event stream path from environment, if configured.
1140    /// Returns None if CODETETHER_EVENT_STREAM_PATH is not set.
1141    fn event_stream_path() -> Option<PathBuf> {
1142        std::env::var("CODETETHER_EVENT_STREAM_PATH")
1143            .ok()
1144            .map(PathBuf::from)
1145    }
1146}
1147
1148/// Result from a session prompt
1149#[derive(Debug, Clone, Serialize, Deserialize)]
1150pub struct SessionResult {
1151    pub text: String,
1152    pub session_id: String,
1153}
1154
1155/// Events emitted during session processing for real-time UI updates
1156#[derive(Debug, Clone)]
1157pub enum SessionEvent {
1158    /// Agent is thinking/processing
1159    Thinking,
1160    /// Tool call started
1161    ToolCallStart { name: String, arguments: String },
1162    /// Tool call completed with result
1163    ToolCallComplete {
1164        name: String,
1165        output: String,
1166        success: bool,
1167    },
1168    /// Partial text output (for streaming)
1169    TextChunk(String),
1170    /// Final text output
1171    TextComplete(String),
1172    /// Model thinking/reasoning output
1173    ThinkingComplete(String),
1174    /// Token usage for one LLM round-trip
1175    UsageReport {
1176        prompt_tokens: usize,
1177        completion_tokens: usize,
1178        duration_ms: u64,
1179        model: String,
1180    },
1181    /// Updated session state for caller to sync back
1182    SessionSync(Session),
1183    /// Processing complete
1184    Done,
1185    /// Error occurred
1186    Error(String),
1187}
1188
1189/// List all sessions
1190pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1191    let sessions_dir = crate::config::Config::data_dir()
1192        .map(|d| d.join("sessions"))
1193        .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1194
1195    if !sessions_dir.exists() {
1196        return Ok(Vec::new());
1197    }
1198
1199    let mut summaries = Vec::new();
1200    let mut entries = fs::read_dir(&sessions_dir).await?;
1201
1202    while let Some(entry) = entries.next_entry().await? {
1203        let path = entry.path();
1204        if path.extension().map(|e| e == "json").unwrap_or(false) {
1205            if let Ok(content) = fs::read_to_string(&path).await {
1206                if let Ok(session) = serde_json::from_str::<Session>(&content) {
1207                    summaries.push(SessionSummary {
1208                        id: session.id,
1209                        title: session.title,
1210                        created_at: session.created_at,
1211                        updated_at: session.updated_at,
1212                        message_count: session.messages.len(),
1213                        agent: session.agent,
1214                        directory: session.metadata.directory,
1215                    });
1216                }
1217            }
1218        }
1219    }
1220
1221    summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1222    Ok(summaries)
1223}
1224
1225/// List sessions scoped to a specific directory (workspace)
1226///
1227/// Only returns sessions whose `metadata.directory` matches the given path.
1228/// This prevents sessions from other workspaces "leaking" into the TUI.
1229pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1230    let all = list_sessions().await?;
1231    let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1232    Ok(all
1233        .into_iter()
1234        .filter(|s| {
1235            s.directory
1236                .as_ref()
1237                .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1238                .unwrap_or(false)
1239        })
1240        .collect())
1241}
1242
1243/// List sessions including OpenCode sessions for a directory.
1244///
1245/// Merges CodeTether sessions with any discovered OpenCode sessions,
1246/// sorted by most recently updated first. OpenCode sessions are
1247/// prefixed with `opencode_` in their ID.
1248pub async fn list_sessions_with_opencode(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1249    let mut sessions = list_sessions_for_directory(dir).await?;
1250
1251    // Also include OpenCode sessions if available
1252    if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1253        if storage.exists() {
1254            if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1255                for oc in oc_sessions {
1256                    // Skip if we already have a CodeTether import of this session
1257                    let import_id = format!("opencode_{}", oc.id);
1258                    if sessions.iter().any(|s| s.id == import_id) {
1259                        continue;
1260                    }
1261
1262                    sessions.push(SessionSummary {
1263                        id: import_id,
1264                        title: Some(format!("[opencode] {}", oc.title)),
1265                        created_at: oc.created_at,
1266                        updated_at: oc.updated_at,
1267                        message_count: oc.message_count,
1268                        agent: "build".to_string(),
1269                        directory: Some(PathBuf::from(&oc.directory)),
1270                    });
1271                }
1272            }
1273        }
1274    }
1275
1276    // Re-sort merged list by updated_at descending
1277    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1278    Ok(sessions)
1279}
1280
1281/// Summary of a session for listing
1282#[derive(Debug, Clone, Serialize, Deserialize)]
1283pub struct SessionSummary {
1284    pub id: String,
1285    pub title: Option<String>,
1286    pub created_at: DateTime<Utc>,
1287    pub updated_at: DateTime<Utc>,
1288    pub message_count: usize,
1289    pub agent: String,
1290    /// The working directory this session was created in
1291    #[serde(default)]
1292    pub directory: Option<PathBuf>,
1293}
1294
1295fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1296    if max_chars == 0 {
1297        return String::new();
1298    }
1299
1300    let mut chars = value.chars();
1301    let mut output = String::new();
1302    for _ in 0..max_chars {
1303        if let Some(ch) = chars.next() {
1304            output.push(ch);
1305        } else {
1306            return value.to_string();
1307        }
1308    }
1309
1310    if chars.next().is_some() {
1311        format!("{output}...")
1312    } else {
1313        output
1314    }
1315}
1316
1317// Async helper for Vec - kept for potential future use
1318#[allow(dead_code)]
1319use futures::StreamExt;
1320
1321#[allow(dead_code)]
1322trait AsyncCollect<T> {
1323    async fn collect(self) -> Vec<T>;
1324}
1325
1326#[allow(dead_code)]
1327impl<S, T> AsyncCollect<T> for S
1328where
1329    S: futures::Stream<Item = T> + Unpin,
1330{
1331    async fn collect(mut self) -> Vec<T> {
1332        let mut items = Vec::new();
1333        while let Some(item) = self.next().await {
1334            items.push(item);
1335        }
1336        items
1337    }
1338}