Skip to main content

codetether_agent/session/
mod.rs

1//! Session management
2//!
3//! Sessions track the conversation history and state for agent interactions.
4
5use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::rlm::router::AutoProcessContext;
11use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
12use crate::tool::ToolRegistry;
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tokio::fs;
20use uuid::Uuid;
21
22use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
23
24/// An image attachment to include with a message (from clipboard paste, etc.)
25#[derive(Debug, Clone)]
26pub struct ImageAttachment {
27    /// Base64-encoded data URL (e.g., "data:image/png;base64,...")
28    pub data_url: String,
29    /// MIME type (e.g., "image/png")
30    pub mime_type: Option<String>,
31}
32
33fn is_interactive_tool(tool_name: &str) -> bool {
34    matches!(tool_name, "question")
35}
36
37fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
38    // Keep Google as an explicit option, but don't default to it first because
39    // some environments expose API keys that are not valid for ChatCompletions.
40    let preferred = [
41        "zai",
42        "openai",
43        "github-copilot",
44        "anthropic",
45        "minimax",
46        "openrouter",
47        "novita",
48        "moonshotai",
49        "google",
50    ];
51    for name in preferred {
52        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
53            return Some(found);
54        }
55    }
56    providers.first().copied()
57}
58
59fn prefers_temperature_one(model: &str) -> bool {
60    let normalized = model.to_ascii_lowercase();
61    normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
62}
63
64/// Return the context window size (in tokens) for known models.
65fn context_window_for_model(model: &str) -> usize {
66    let m = model.to_ascii_lowercase();
67    if m.contains("kimi-k2") {
68        256_000
69    } else if m.contains("glm-5") || m.contains("glm5") {
70        200_000
71    } else if m.contains("gpt-4o") {
72        128_000
73    } else if m.contains("gpt-5") {
74        256_000
75    } else if m.contains("claude") {
76        200_000
77    } else if m.contains("gemini") {
78        1_000_000
79    } else if m.contains("minimax") || m.contains("m2.5") {
80        256_000
81    } else if m.contains("qwen") {
82        131_072
83    } else {
84        128_000 // conservative default
85    }
86}
87
88/// A conversation session
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct Session {
91    pub id: String,
92    pub title: Option<String>,
93    pub created_at: DateTime<Utc>,
94    pub updated_at: DateTime<Utc>,
95    pub messages: Vec<Message>,
96    pub tool_uses: Vec<ToolUse>,
97    pub usage: Usage,
98    pub agent: String,
99    pub metadata: SessionMetadata,
100    /// Optional bus for publishing agent thinking/reasoning to training pipeline
101    #[serde(skip)]
102    pub bus: Option<Arc<crate::bus::AgentBus>>,
103}
104
105#[derive(Debug, Clone, Default, Serialize, Deserialize)]
106pub struct SessionMetadata {
107    pub directory: Option<PathBuf>,
108    pub model: Option<String>,
109    pub shared: bool,
110    pub share_url: Option<String>,
111}
112
113impl Session {
114    fn default_model_for_provider(provider: &str) -> String {
115        match provider {
116            "moonshotai" => "kimi-k2.5".to_string(),
117            "anthropic" => "claude-sonnet-4-20250514".to_string(),
118            "minimax" => "MiniMax-M2.5".to_string(),
119            "openai" => "gpt-4o".to_string(),
120            "google" => "gemini-2.5-pro".to_string(),
121            "zhipuai" | "zai" => "glm-5".to_string(),
122            // OpenRouter uses model IDs like "z-ai/glm-5".
123            "openrouter" => "z-ai/glm-5".to_string(),
124            "novita" => "qwen/qwen3-coder-next".to_string(),
125            "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
126            _ => "glm-5".to_string(),
127        }
128    }
129
130    /// Create a new session
131    pub async fn new() -> Result<Self> {
132        let id = Uuid::new_v4().to_string();
133        let now = Utc::now();
134
135        Ok(Self {
136            id,
137            title: None,
138            created_at: now,
139            updated_at: now,
140            messages: Vec::new(),
141            tool_uses: Vec::new(),
142            usage: Usage::default(),
143            agent: "build".to_string(),
144            metadata: SessionMetadata {
145                directory: Some(std::env::current_dir()?),
146                ..Default::default()
147            },
148            bus: None,
149        })
150    }
151
152    /// Attach a bus for publishing agent thinking/reasoning
153    pub fn with_bus(mut self, bus: Arc<crate::bus::AgentBus>) -> Self {
154        self.bus = Some(bus);
155        self
156    }
157
158    /// Load an existing session
159    pub async fn load(id: &str) -> Result<Self> {
160        let path = Self::session_path(id)?;
161        let content = fs::read_to_string(&path).await?;
162        let session: Session = serde_json::from_str(&content)?;
163        Ok(session)
164    }
165
166    /// Load the last session, optionally scoped to a workspace directory
167    ///
168    /// When `workspace` is Some, only considers sessions created in that directory.
169    /// When None, returns the most recent session globally (legacy behavior).
170    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
171        let sessions_dir = Self::sessions_dir()?;
172
173        if !sessions_dir.exists() {
174            anyhow::bail!("No sessions found");
175        }
176
177        let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
178        let mut read_dir = fs::read_dir(&sessions_dir).await?;
179        while let Some(entry) = read_dir.next_entry().await? {
180            entries.push(entry);
181        }
182
183        if entries.is_empty() {
184            anyhow::bail!("No sessions found");
185        }
186
187        // Sort by modification time (most recent first)
188        // Use std::fs::metadata since we can't await in sort_by_key
189        entries.sort_by_key(|e| {
190            std::cmp::Reverse(
191                std::fs::metadata(e.path())
192                    .ok()
193                    .and_then(|m| m.modified().ok())
194                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
195            )
196        });
197
198        let canonical_workspace =
199            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
200
201        for entry in &entries {
202            let content: String = fs::read_to_string(entry.path()).await?;
203            if let Ok(session) = serde_json::from_str::<Session>(&content) {
204                // If workspace scoping requested, filter by directory
205                if let Some(ref ws) = canonical_workspace {
206                    if let Some(ref dir) = session.metadata.directory {
207                        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
208                        if &canonical_dir == ws {
209                            return Ok(session);
210                        }
211                    }
212                    continue;
213                }
214                return Ok(session);
215            }
216        }
217
218        anyhow::bail!("No sessions found")
219    }
220
221    /// Load the last session (global, unscoped — legacy compatibility)
222    pub async fn last() -> Result<Self> {
223        Self::last_for_directory(None).await
224    }
225
226    /// Save the session to disk
227    pub async fn save(&self) -> Result<()> {
228        let path = Self::session_path(&self.id)?;
229
230        if let Some(parent) = path.parent() {
231            fs::create_dir_all(parent).await?;
232        }
233
234        let content = serde_json::to_string_pretty(self)?;
235        fs::write(&path, content).await?;
236
237        Ok(())
238    }
239
240    /// Add a message to the session
241    pub fn add_message(&mut self, message: Message) {
242        self.messages.push(message);
243        self.updated_at = Utc::now();
244    }
245
246    /// Execute a prompt and get the result
247    pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
248        use crate::provider::{
249            CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
250        };
251
252        // Load providers from Vault
253        let registry = ProviderRegistry::from_vault().await?;
254
255        let providers = registry.list();
256        if providers.is_empty() {
257            anyhow::bail!(
258                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
259            );
260        }
261
262        tracing::info!("Available providers: {:?}", providers);
263
264        // Parse model string (format: "provider/model", "provider", or just "model")
265        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
266            let (prov, model) = parse_model_string(model_str);
267            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
268            if prov.is_some() {
269                // Format: provider/model
270                (prov.map(|s| s.to_string()), model.to_string())
271            } else if providers.contains(&model) {
272                // Format: just provider name (e.g., "novita")
273                (Some(model.to_string()), String::new())
274            } else {
275                // Format: just model name
276                (None, model.to_string())
277            }
278        } else {
279            (None, String::new())
280        };
281
282        // Determine which provider to use with deterministic fallback ordering.
283        let selected_provider = provider_name
284            .as_deref()
285            .filter(|p| providers.contains(p))
286            .or_else(|| choose_default_provider(providers.as_slice()))
287            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
288
289        let provider = registry
290            .get(selected_provider)
291            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
292
293        // Add user message to session using add_message
294        self.add_message(Message {
295            role: Role::User,
296            content: vec![ContentPart::Text {
297                text: message.to_string(),
298            }],
299        });
300
301        // Generate title if this is the first user message and no title exists
302        if self.title.is_none() {
303            self.generate_title().await?;
304        }
305
306        // Determine model to use
307        let model = if !model_id.is_empty() {
308            model_id
309        } else {
310            Self::default_model_for_provider(selected_provider)
311        };
312
313        // Compress oversized user message via RLM if it exceeds the context threshold
314        {
315            let ctx_window = context_window_for_model(&model);
316            let msg_tokens = RlmChunker::estimate_tokens(message);
317            let threshold = (ctx_window as f64 * 0.35) as usize;
318            if msg_tokens > threshold {
319                tracing::info!(
320                    msg_tokens,
321                    threshold,
322                    ctx_window,
323                    "RLM: User message exceeds context threshold, compressing"
324                );
325                let auto_ctx = AutoProcessContext {
326                    tool_id: "session_context",
327                    tool_args: serde_json::json!({}),
328                    session_id: &self.id,
329                    abort: None,
330                    on_progress: None,
331                    provider: Arc::clone(&provider),
332                    model: model.clone(),
333                };
334                let rlm_config = RlmConfig::default();
335                match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
336                    Ok(result) => {
337                        tracing::info!(
338                            input_tokens = result.stats.input_tokens,
339                            output_tokens = result.stats.output_tokens,
340                            "RLM: User message compressed"
341                        );
342                        // Replace the last message (user message we just added)
343                        if let Some(last) = self.messages.last_mut() {
344                            last.content = vec![ContentPart::Text {
345                                text: format!(
346                                    "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
347                                    msg_tokens,
348                                    result.processed,
349                                    message.chars().take(500).collect::<String>()
350                                ),
351                            }];
352                        }
353                    }
354                    Err(e) => {
355                        tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
356                        let max_chars = threshold * 4;
357                        let truncated = RlmChunker::compress(message, max_chars / 4, None);
358                        if let Some(last) = self.messages.last_mut() {
359                            last.content = vec![ContentPart::Text { text: truncated }];
360                        }
361                    }
362                }
363            }
364        }
365
366        // Create tool registry with all available tools
367        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
368        let tool_definitions: Vec<_> = tool_registry
369            .definitions()
370            .into_iter()
371            .filter(|tool| !is_interactive_tool(&tool.name))
372            .collect();
373
374        // Some models behave best with temperature=1.0.
375        // - Kimi K2.x requires temperature=1.0
376        // - GLM (Z.AI) defaults to temperature 1.0 for coding workflows
377        // Use contains() to match both short aliases and provider-qualified IDs.
378        let temperature = if prefers_temperature_one(&model) {
379            Some(1.0)
380        } else {
381            Some(0.7)
382        };
383
384        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
385        tracing::info!("Available tools: {}", tool_definitions.len());
386
387        // All current providers support native tool calling.  Hardcode to
388        // true so we skip the expensive list_models() API call on every message.
389
390        let model_supports_tools = true;
391
392        // Build system prompt with AGENTS.md
393        let cwd = self
394            .metadata
395            .directory
396            .clone()
397            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
398        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
399
400        // Run agentic loop with tool execution
401        let max_steps = 50;
402        let mut final_output = String::new();
403
404        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
405
406        let tool_router: Option<ToolCallRouter> = {
407            let cfg = ToolRouterConfig::from_env();
408            match ToolCallRouter::from_config(&cfg) {
409                Ok(r) => r,
410                Err(e) => {
411                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
412                    None
413                }
414            }
415        };
416
417        for step in 1..=max_steps {
418            tracing::info!(step = step, "Agent step starting");
419
420            // Build messages with system prompt first
421            let mut messages = vec![Message {
422                role: Role::System,
423                content: vec![ContentPart::Text {
424                    text: system_prompt.clone(),
425                }],
426            }];
427            messages.extend(self.messages.clone());
428
429            // Create completion request with tools
430            let request = CompletionRequest {
431                messages,
432                tools: tool_definitions.clone(),
433                model: model.clone(),
434                temperature,
435                top_p: None,
436                max_tokens: Some(8192),
437                stop: Vec::new(),
438            };
439
440            // Call the provider
441            let response = provider.complete(request).await?;
442
443            // Optionally route text-only responses through FunctionGemma to
444            // produce structured tool calls.  Skipped when the model natively
445            // supports tool calling (which all current providers do).
446
447            let response = if let Some(ref router) = tool_router {
448                router
449                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
450                    .await
451            } else {
452                response
453            };
454
455            // Record token usage
456            crate::telemetry::TOKEN_USAGE.record_model_usage(
457                &model,
458                response.usage.prompt_tokens as u64,
459                response.usage.completion_tokens as u64,
460            );
461
462            // Extract tool calls from response
463            let tool_calls: Vec<(String, String, serde_json::Value)> = response
464                .message
465                .content
466                .iter()
467                .filter_map(|part| {
468                    if let ContentPart::ToolCall {
469                        id,
470                        name,
471                        arguments,
472                        ..
473                    } = part
474                    {
475                        // Parse arguments JSON string into Value
476                        let args: serde_json::Value =
477                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
478                        Some((id.clone(), name.clone(), args))
479                    } else {
480                        None
481                    }
482                })
483                .collect();
484
485            // Collect text output and publish thinking to bus
486            for part in &response.message.content {
487                match part {
488                    ContentPart::Text { text } if !text.is_empty() => {
489                        final_output.push_str(text);
490                        final_output.push('\n');
491                    }
492                    ContentPart::Thinking { text } if !text.is_empty() => {
493                        if let Some(ref bus) = self.bus {
494                            let handle = bus.handle(&self.agent);
495                            handle.send(
496                                format!("agent.{}.thinking", self.agent),
497                                crate::bus::BusMessage::AgentThinking {
498                                    agent_id: self.agent.clone(),
499                                    thinking: text.clone(),
500                                    step,
501                                },
502                            );
503                        }
504                    }
505                    _ => {}
506                }
507            }
508
509            // If no tool calls, we're done
510            if tool_calls.is_empty() {
511                self.add_message(response.message.clone());
512                break;
513            }
514
515            // Add assistant message with tool calls
516            self.add_message(response.message.clone());
517
518            tracing::info!(
519                step = step,
520                num_tools = tool_calls.len(),
521                "Executing tool calls"
522            );
523
524            // Execute each tool call
525            for (tool_id, tool_name, tool_input) in tool_calls {
526                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
527
528                // Publish tool request to bus for training pipeline
529                if let Some(ref bus) = self.bus {
530                    let handle = bus.handle(&self.agent);
531                    handle.send(
532                        format!("agent.{}.tool.request", self.agent),
533                        crate::bus::BusMessage::ToolRequest {
534                            request_id: tool_id.clone(),
535                            agent_id: self.agent.clone(),
536                            tool_name: tool_name.clone(),
537                            arguments: tool_input.clone(),
538                        },
539                    );
540                }
541
542                if is_interactive_tool(&tool_name) {
543                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
544                    self.add_message(Message {
545                        role: Role::Tool,
546                        content: vec![ContentPart::ToolResult {
547                            tool_call_id: tool_id,
548                            content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
549                        }],
550                    });
551                    continue;
552                }
553
554                // Get and execute the tool
555                let exec_start = std::time::Instant::now();
556                let content = if let Some(tool) = tool_registry.get(&tool_name) {
557                    match tool.execute(tool_input.clone()).await {
558                        Ok(result) => {
559                            let duration_ms = exec_start.elapsed().as_millis() as u64;
560                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
561                            if let Some(audit) = try_audit_log() {
562                                audit.log_with_correlation(
563                                        AuditCategory::ToolExecution,
564                                        format!("tool:{}", tool_name),
565                                        if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
566                                        None,
567                                        Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
568                                        None,  // okr_id
569                                        None,  // okr_run_id
570                                        None,  // relay_id
571                                        Some(self.id.clone()),  // session_id
572                                    ).await;
573                            }
574                            result.output
575                        }
576                        Err(e) => {
577                            let duration_ms = exec_start.elapsed().as_millis() as u64;
578                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
579                            if let Some(audit) = try_audit_log() {
580                                audit.log_with_correlation(
581                                        AuditCategory::ToolExecution,
582                                        format!("tool:{}", tool_name),
583                                        AuditOutcome::Failure,
584                                        None,
585                                        Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
586                                        None,  // okr_id
587                                        None,  // okr_run_id
588                                        None,  // relay_id
589                                        Some(self.id.clone()),  // session_id
590                                    ).await;
591                            }
592                            format!("Error: {}", e)
593                        }
594                    }
595                } else {
596                    tracing::warn!(tool = %tool_name, "Tool not found");
597                    if let Some(audit) = try_audit_log() {
598                        audit
599                            .log_with_correlation(
600                                AuditCategory::ToolExecution,
601                                format!("tool:{}", tool_name),
602                                AuditOutcome::Failure,
603                                None,
604                                Some(json!({ "error": "unknown_tool" })),
605                                None,                  // okr_id
606                                None,                  // okr_run_id
607                                None,                  // relay_id
608                                Some(self.id.clone()), // session_id
609                            )
610                            .await;
611                    }
612                    format!("Error: Unknown tool '{}'", tool_name)
613                };
614
615                // Calculate duration for event stream
616                let duration_ms = exec_start.elapsed().as_millis() as u64;
617                let success = !content.starts_with("Error:");
618
619                // Publish full tool output to bus for training pipeline
620                // (before RLM truncation so we capture the complete output)
621                if let Some(ref bus) = self.bus {
622                    let handle = bus.handle(&self.agent);
623                    handle.send(
624                        format!("agent.{}.tool.output", self.agent),
625                        crate::bus::BusMessage::ToolOutputFull {
626                            agent_id: self.agent.clone(),
627                            tool_name: tool_name.clone(),
628                            output: content.clone(),
629                            success,
630                            step,
631                        },
632                    );
633                }
634
635                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
636                if let Some(base_dir) = Self::event_stream_path() {
637                    let workspace = std::env::var("PWD")
638                        .map(PathBuf::from)
639                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
640                    let event = ChatEvent::tool_result(
641                        workspace,
642                        self.id.clone(),
643                        &tool_name,
644                        success,
645                        duration_ms,
646                        &content,
647                        self.messages.len() as u64,
648                    );
649                    let event_json = event.to_json();
650                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
651                    let seq = self.messages.len() as u64;
652                    let filename = format!(
653                        "{}-chat-events-{:020}-{:020}.jsonl",
654                        timestamp,
655                        seq * 10000,
656                        (seq + 1) * 10000
657                    );
658                    let event_path = base_dir.join(&self.id).join(filename);
659
660                    let event_path_clone = event_path;
661                    tokio::spawn(async move {
662                        if let Some(parent) = event_path_clone.parent() {
663                            let _ = tokio::fs::create_dir_all(parent).await;
664                        }
665                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
666                            .create(true)
667                            .append(true)
668                            .open(&event_path_clone)
669                            .await
670                        {
671                            use tokio::io::AsyncWriteExt;
672                            let _ = file.write_all(event_json.as_bytes()).await;
673                            let _ = file.write_all(b"\n").await;
674                        }
675                    });
676                }
677
678                // Route large tool outputs through RLM
679                let content = {
680                    let ctx_window = context_window_for_model(&model);
681                    let total_chars: usize = self
682                        .messages
683                        .iter()
684                        .map(|m| {
685                            m.content
686                                .iter()
687                                .map(|p| match p {
688                                    ContentPart::Text { text } => text.len(),
689                                    ContentPart::ToolResult { content, .. } => content.len(),
690                                    _ => 0,
691                                })
692                                .sum::<usize>()
693                        })
694                        .sum();
695                    let current_tokens = total_chars / 4; // ~4 chars per token
696                    let routing_ctx = RoutingContext {
697                        tool_id: tool_name.clone(),
698                        session_id: self.id.clone(),
699                        call_id: Some(tool_id.clone()),
700                        model_context_limit: ctx_window,
701                        current_context_tokens: Some(current_tokens),
702                    };
703                    let rlm_config = RlmConfig::default();
704                    let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
705                    if routing.should_route {
706                        tracing::info!(
707                            tool = %tool_name,
708                            reason = %routing.reason,
709                            estimated_tokens = routing.estimated_tokens,
710                            "RLM: Routing large tool output"
711                        );
712                        let auto_ctx = AutoProcessContext {
713                            tool_id: &tool_name,
714                            tool_args: tool_input.clone(),
715                            session_id: &self.id,
716                            abort: None,
717                            on_progress: None,
718                            provider: Arc::clone(&provider),
719                            model: model.clone(),
720                        };
721                        match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
722                            Ok(result) => {
723                                tracing::info!(
724                                    input_tokens = result.stats.input_tokens,
725                                    output_tokens = result.stats.output_tokens,
726                                    iterations = result.stats.iterations,
727                                    "RLM: Processing complete"
728                                );
729                                result.processed
730                            }
731                            Err(e) => {
732                                tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
733                                let (truncated, _, _) = RlmRouter::smart_truncate(
734                                    &content,
735                                    &tool_name,
736                                    &tool_input,
737                                    ctx_window / 4,
738                                );
739                                truncated
740                            }
741                        }
742                    } else {
743                        content
744                    }
745                };
746
747                // Add tool result message
748                self.add_message(Message {
749                    role: Role::Tool,
750                    content: vec![ContentPart::ToolResult {
751                        tool_call_id: tool_id,
752                        content,
753                    }],
754                });
755            }
756        }
757
758        // Save session after each prompt to persist messages
759        self.save().await?;
760
761        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
762        self.archive_event_stream_to_s3().await;
763
764        Ok(SessionResult {
765            text: final_output.trim().to_string(),
766            session_id: self.id.clone(),
767        })
768    }
769
770    /// Archive event stream files to S3/R2 for immutable compliance logging
771    async fn archive_event_stream_to_s3(&self) {
772        // Check if S3 is configured
773        if !S3Sink::is_configured() {
774            return;
775        }
776
777        let Some(base_dir) = Self::event_stream_path() else {
778            return;
779        };
780
781        let session_event_dir = base_dir.join(&self.id);
782        if !session_event_dir.exists() {
783            return;
784        }
785
786        // Try to create S3 sink
787        let Ok(sink) = S3Sink::from_env().await else {
788            tracing::warn!("Failed to create S3 sink for archival");
789            return;
790        };
791
792        // Upload all event files in the session directory
793        let session_id = self.id.clone();
794        tokio::spawn(async move {
795            if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
796                while let Ok(Some(entry)) = entries.next_entry().await {
797                    let path = entry.path();
798                    if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
799                        match sink.upload_file(&path, &session_id).await {
800                            Ok(url) => {
801                                tracing::info!(url = %url, "Archived event stream to S3/R2");
802                            }
803                            Err(e) => {
804                                tracing::warn!(error = %e, "Failed to archive event file to S3");
805                            }
806                        }
807                    }
808                }
809            }
810        });
811    }
812
813    /// Process a user message with real-time event streaming for UI updates.
814    /// Events are sent through the provided channel as tool calls execute.
815    ///
816    /// Accepts a pre-loaded `ProviderRegistry` to avoid re-fetching secrets
817    /// from Vault on every message (which was the primary TUI performance
818    /// bottleneck).
819    pub async fn prompt_with_events(
820        &mut self,
821        message: &str,
822        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
823        registry: std::sync::Arc<crate::provider::ProviderRegistry>,
824    ) -> Result<SessionResult> {
825        self.prompt_with_events_and_images(message, Vec::new(), event_tx, registry)
826            .await
827    }
828
829    /// Execute a prompt with optional image attachments and stream events.
830    ///
831    /// This is the full-featured version that supports multimodal input (text + images).
832    /// Images should be base64-encoded data URLs.
833    pub async fn prompt_with_events_and_images(
834        &mut self,
835        message: &str,
836        images: Vec<ImageAttachment>,
837        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
838        registry: std::sync::Arc<crate::provider::ProviderRegistry>,
839    ) -> Result<SessionResult> {
840        use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
841
842        let _ = event_tx.send(SessionEvent::Thinking).await;
843
844        let providers = registry.list();
845        if providers.is_empty() {
846            anyhow::bail!(
847                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
848            );
849        }
850        tracing::info!("Available providers: {:?}", providers);
851
852        // Parse model string (format: "provider/model", "provider", or just "model")
853        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
854            let (prov, model) = parse_model_string(model_str);
855            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
856            if prov.is_some() {
857                (prov.map(|s| s.to_string()), model.to_string())
858            } else if providers.contains(&model) {
859                (Some(model.to_string()), String::new())
860            } else {
861                (None, model.to_string())
862            }
863        } else {
864            (None, String::new())
865        };
866
867        // Determine which provider to use with deterministic fallback ordering.
868        let selected_provider = provider_name
869            .as_deref()
870            .filter(|p| providers.contains(p))
871            .or_else(|| choose_default_provider(providers.as_slice()))
872            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
873
874        let provider = registry
875            .get(selected_provider)
876            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
877
878        // Build user message content parts (text + optional images)
879        let mut content_parts = vec![ContentPart::Text {
880            text: message.to_string(),
881        }];
882
883        // Add image attachments
884        for img in &images {
885            content_parts.push(ContentPart::Image {
886                url: img.data_url.clone(),
887                mime_type: img.mime_type.clone(),
888            });
889        }
890
891        if !images.is_empty() {
892            tracing::info!(
893                image_count = images.len(),
894                "Adding {} image attachment(s) to user message",
895                images.len()
896            );
897        }
898
899        // Add user message
900        self.add_message(Message {
901            role: Role::User,
902            content: content_parts,
903        });
904
905        // Generate title if needed
906        if self.title.is_none() {
907            self.generate_title().await?;
908        }
909
910        // Determine model
911        let model = if !model_id.is_empty() {
912            model_id
913        } else {
914            Self::default_model_for_provider(selected_provider)
915        };
916
917        // Compress oversized user message via RLM if it exceeds the context threshold
918        {
919            let ctx_window = context_window_for_model(&model);
920            let msg_tokens = RlmChunker::estimate_tokens(message);
921            let threshold = (ctx_window as f64 * 0.35) as usize;
922            if msg_tokens > threshold {
923                tracing::info!(
924                    msg_tokens,
925                    threshold,
926                    ctx_window,
927                    "RLM: User message exceeds context threshold, compressing"
928                );
929                let auto_ctx = AutoProcessContext {
930                    tool_id: "session_context",
931                    tool_args: serde_json::json!({}),
932                    session_id: &self.id,
933                    abort: None,
934                    on_progress: None,
935                    provider: Arc::clone(&provider),
936                    model: model.clone(),
937                };
938                let rlm_config = RlmConfig::default();
939                match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
940                    Ok(result) => {
941                        tracing::info!(
942                            input_tokens = result.stats.input_tokens,
943                            output_tokens = result.stats.output_tokens,
944                            "RLM: User message compressed"
945                        );
946                        if let Some(last) = self.messages.last_mut() {
947                            last.content = vec![ContentPart::Text {
948                                text: format!(
949                                    "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
950                                    msg_tokens,
951                                    result.processed,
952                                    message.chars().take(500).collect::<String>()
953                                ),
954                            }];
955                        }
956                    }
957                    Err(e) => {
958                        tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
959                        let max_chars = threshold * 4;
960                        let truncated = RlmChunker::compress(message, max_chars / 4, None);
961                        if let Some(last) = self.messages.last_mut() {
962                            last.content = vec![ContentPart::Text { text: truncated }];
963                        }
964                    }
965                }
966            }
967        }
968
969        // Create tool registry
970        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
971        let tool_definitions: Vec<_> = tool_registry
972            .definitions()
973            .into_iter()
974            .filter(|tool| !is_interactive_tool(&tool.name))
975            .collect();
976
977        let temperature = if prefers_temperature_one(&model) {
978            Some(1.0)
979        } else {
980            Some(0.7)
981        };
982
983        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
984        tracing::info!("Available tools: {}", tool_definitions.len());
985
986        // All current providers support native tool calling.  Hardcode to
987        // true so we skip the expensive list_models() API call on every message.
988
989        let model_supports_tools = true;
990
991        // Build system prompt
992        let cwd = std::env::var("PWD")
993            .map(std::path::PathBuf::from)
994            .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
995        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
996
997        let mut final_output = String::new();
998        let max_steps = 50;
999
1000        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
1001
1002        let tool_router: Option<ToolCallRouter> = {
1003            let cfg = ToolRouterConfig::from_env();
1004            match ToolCallRouter::from_config(&cfg) {
1005                Ok(r) => r,
1006                Err(e) => {
1007                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
1008                    None
1009                }
1010            }
1011        };
1012
1013        for step in 1..=max_steps {
1014            tracing::info!(step = step, "Agent step starting");
1015            let _ = event_tx.send(SessionEvent::Thinking).await;
1016
1017            // Build messages with system prompt first
1018            let mut messages = vec![Message {
1019                role: Role::System,
1020                content: vec![ContentPart::Text {
1021                    text: system_prompt.clone(),
1022                }],
1023            }];
1024            messages.extend(self.messages.clone());
1025
1026            let request = CompletionRequest {
1027                messages,
1028                tools: tool_definitions.clone(),
1029                model: model.clone(),
1030                temperature,
1031                top_p: None,
1032                max_tokens: Some(8192),
1033                stop: Vec::new(),
1034            };
1035
1036            let llm_start = std::time::Instant::now();
1037            let response = provider.complete(request).await?;
1038            let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
1039
1040            // Optionally route text-only responses through FunctionGemma to
1041            // produce structured tool calls.  Skipped for native tool-calling models.
1042
1043            let response = if let Some(ref router) = tool_router {
1044                router
1045                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
1046                    .await
1047            } else {
1048                response
1049            };
1050
1051            crate::telemetry::TOKEN_USAGE.record_model_usage(
1052                &model,
1053                response.usage.prompt_tokens as u64,
1054                response.usage.completion_tokens as u64,
1055            );
1056
1057            // Emit usage report for TUI display
1058            let _ = event_tx
1059                .send(SessionEvent::UsageReport {
1060                    prompt_tokens: response.usage.prompt_tokens,
1061                    completion_tokens: response.usage.completion_tokens,
1062                    duration_ms: llm_duration_ms,
1063                    model: model.clone(),
1064                })
1065                .await;
1066
1067            // Extract tool calls
1068            let tool_calls: Vec<(String, String, serde_json::Value)> = response
1069                .message
1070                .content
1071                .iter()
1072                .filter_map(|part| {
1073                    if let ContentPart::ToolCall {
1074                        id,
1075                        name,
1076                        arguments,
1077                        ..
1078                    } = part
1079                    {
1080                        let args: serde_json::Value =
1081                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
1082                        Some((id.clone(), name.clone(), args))
1083                    } else {
1084                        None
1085                    }
1086                })
1087                .collect();
1088
1089            // Collect text output for this step
1090            // Collect thinking and text output
1091            let mut thinking_text = String::new();
1092            let mut step_text = String::new();
1093            for part in &response.message.content {
1094                match part {
1095                    ContentPart::Thinking { text } => {
1096                        if !text.is_empty() {
1097                            thinking_text.push_str(text);
1098                            thinking_text.push('\n');
1099                        }
1100                    }
1101                    ContentPart::Text { text } => {
1102                        if !text.is_empty() {
1103                            step_text.push_str(text);
1104                            step_text.push('\n');
1105                        }
1106                    }
1107                    _ => {}
1108                }
1109            }
1110
1111            // Emit thinking output first
1112            if !thinking_text.trim().is_empty() {
1113                let _ = event_tx
1114                    .send(SessionEvent::ThinkingComplete(
1115                        thinking_text.trim().to_string(),
1116                    ))
1117                    .await;
1118                if let Some(ref bus) = self.bus {
1119                    let handle = bus.handle(&self.agent);
1120                    handle.send(
1121                        format!("agent.{}.thinking", self.agent),
1122                        crate::bus::BusMessage::AgentThinking {
1123                            agent_id: self.agent.clone(),
1124                            thinking: thinking_text.trim().to_string(),
1125                            step,
1126                        },
1127                    );
1128                }
1129            }
1130
1131            // Emit this step's text BEFORE tool calls so it appears in correct
1132            // chronological order in the TUI chat display.
1133            if !step_text.trim().is_empty() {
1134                let trimmed = step_text.trim().to_string();
1135                let _ = event_tx
1136                    .send(SessionEvent::TextChunk(trimmed.clone()))
1137                    .await;
1138                let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
1139                final_output.push_str(&step_text);
1140            }
1141
1142            if tool_calls.is_empty() {
1143                self.add_message(response.message.clone());
1144                break;
1145            }
1146
1147            self.add_message(response.message.clone());
1148
1149            tracing::info!(
1150                step = step,
1151                num_tools = tool_calls.len(),
1152                "Executing tool calls"
1153            );
1154
1155            // Execute each tool call with events
1156            for (tool_id, tool_name, tool_input) in tool_calls {
1157                let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
1158                let _ = event_tx
1159                    .send(SessionEvent::ToolCallStart {
1160                        name: tool_name.clone(),
1161                        arguments: args_str,
1162                    })
1163                    .await;
1164
1165                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
1166
1167                // Publish tool request to bus for training pipeline
1168                if let Some(ref bus) = self.bus {
1169                    let handle = bus.handle(&self.agent);
1170                    handle.send(
1171                        format!("agent.{}.tool.request", self.agent),
1172                        crate::bus::BusMessage::ToolRequest {
1173                            request_id: tool_id.clone(),
1174                            agent_id: self.agent.clone(),
1175                            tool_name: tool_name.clone(),
1176                            arguments: tool_input.clone(),
1177                        },
1178                    );
1179                }
1180
1181                if is_interactive_tool(&tool_name) {
1182                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
1183                    let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
1184                    let _ = event_tx
1185                        .send(SessionEvent::ToolCallComplete {
1186                            name: tool_name.clone(),
1187                            output: content.clone(),
1188                            success: false,
1189                        })
1190                        .await;
1191                    self.add_message(Message {
1192                        role: Role::Tool,
1193                        content: vec![ContentPart::ToolResult {
1194                            tool_call_id: tool_id,
1195                            content,
1196                        }],
1197                    });
1198                    continue;
1199                }
1200
1201                let exec_start = std::time::Instant::now();
1202                let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
1203                    match tool.execute(tool_input.clone()).await {
1204                        Ok(result) => {
1205                            let duration_ms = exec_start.elapsed().as_millis() as u64;
1206                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
1207                            if let Some(audit) = try_audit_log() {
1208                                audit.log_with_correlation(
1209                                        AuditCategory::ToolExecution,
1210                                        format!("tool:{}", tool_name),
1211                                        if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
1212                                        None,
1213                                        Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
1214                                        None,  // okr_id
1215                                        None,  // okr_run_id
1216                                        None,  // relay_id
1217                                        Some(self.id.clone()),  // session_id
1218                                    ).await;
1219                            }
1220                            (result.output, result.success)
1221                        }
1222                        Err(e) => {
1223                            let duration_ms = exec_start.elapsed().as_millis() as u64;
1224                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
1225                            if let Some(audit) = try_audit_log() {
1226                                audit.log_with_correlation(
1227                                        AuditCategory::ToolExecution,
1228                                        format!("tool:{}", tool_name),
1229                                        AuditOutcome::Failure,
1230                                        None,
1231                                        Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
1232                                        None,  // okr_id
1233                                        None,  // okr_run_id
1234                                        None,  // relay_id
1235                                        Some(self.id.clone()),  // session_id
1236                                    ).await;
1237                            }
1238                            (format!("Error: {}", e), false)
1239                        }
1240                    }
1241                } else {
1242                    tracing::warn!(tool = %tool_name, "Tool not found");
1243                    if let Some(audit) = try_audit_log() {
1244                        audit
1245                            .log_with_correlation(
1246                                AuditCategory::ToolExecution,
1247                                format!("tool:{}", tool_name),
1248                                AuditOutcome::Failure,
1249                                None,
1250                                Some(json!({ "error": "unknown_tool" })),
1251                                None,                  // okr_id
1252                                None,                  // okr_run_id
1253                                None,                  // relay_id
1254                                Some(self.id.clone()), // session_id
1255                            )
1256                            .await;
1257                    }
1258                    (format!("Error: Unknown tool '{}'", tool_name), false)
1259                };
1260
1261                // Calculate total duration from exec_start (captured from line 772)
1262                let duration_ms = exec_start.elapsed().as_millis() as u64;
1263
1264                // Publish full tool output to bus for training pipeline
1265                if let Some(ref bus) = self.bus {
1266                    let handle = bus.handle(&self.agent);
1267                    handle.send(
1268                        format!("agent.{}.tool.output", self.agent),
1269                        crate::bus::BusMessage::ToolOutputFull {
1270                            agent_id: self.agent.clone(),
1271                            tool_name: tool_name.clone(),
1272                            output: content.clone(),
1273                            success,
1274                            step,
1275                        },
1276                    );
1277                }
1278
1279                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
1280                // This creates the structured JSONL record with byte-range offsets
1281                // File format: {timestamp}-chat-events-{start_byte}-{end_byte}.jsonl
1282                if let Some(base_dir) = Self::event_stream_path() {
1283                    let workspace = std::env::var("PWD")
1284                        .map(PathBuf::from)
1285                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
1286                    let event = ChatEvent::tool_result(
1287                        workspace,
1288                        self.id.clone(),
1289                        &tool_name,
1290                        success,
1291                        duration_ms,
1292                        &content,
1293                        self.messages.len() as u64,
1294                    );
1295                    let event_json = event.to_json();
1296                    let event_size = event_json.len() as u64 + 1; // +1 for newline
1297
1298                    // Generate filename with byte-range offsets for random access replay
1299                    // Format: {timestamp}-chat-events-{start_offset}-{end_offset}.jsonl
1300                    // We use a session-scoped counter stored in metadata for byte tracking
1301                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
1302                    let seq = self.messages.len() as u64;
1303                    let filename = format!(
1304                        "{}-chat-events-{:020}-{:020}.jsonl",
1305                        timestamp,
1306                        seq * 10000,       // approximated start offset
1307                        (seq + 1) * 10000  // approximated end offset
1308                    );
1309                    let event_path = base_dir.join(&self.id).join(filename);
1310
1311                    // Fire-and-forget: don't block tool execution on event logging
1312                    let event_path_clone = event_path;
1313                    tokio::spawn(async move {
1314                        if let Some(parent) = event_path_clone.parent() {
1315                            let _ = tokio::fs::create_dir_all(parent).await;
1316                        }
1317                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
1318                            .create(true)
1319                            .append(true)
1320                            .open(&event_path_clone)
1321                            .await
1322                        {
1323                            use tokio::io::AsyncWriteExt;
1324                            let _ = file.write_all(event_json.as_bytes()).await;
1325                            let _ = file.write_all(b"\n").await;
1326                            tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
1327                        }
1328                    });
1329                }
1330
1331                let _ = event_tx
1332                    .send(SessionEvent::ToolCallComplete {
1333                        name: tool_name.clone(),
1334                        output: content.clone(),
1335                        success,
1336                    })
1337                    .await;
1338
1339                // Route large tool outputs through RLM
1340                let content = {
1341                    let ctx_window = context_window_for_model(&model);
1342                    let total_chars: usize = self
1343                        .messages
1344                        .iter()
1345                        .map(|m| {
1346                            m.content
1347                                .iter()
1348                                .map(|p| match p {
1349                                    ContentPart::Text { text } => text.len(),
1350                                    ContentPart::ToolResult { content, .. } => content.len(),
1351                                    _ => 0,
1352                                })
1353                                .sum::<usize>()
1354                        })
1355                        .sum();
1356                    let current_tokens = total_chars / 4;
1357                    let routing_ctx = RoutingContext {
1358                        tool_id: tool_name.clone(),
1359                        session_id: self.id.clone(),
1360                        call_id: Some(tool_id.clone()),
1361                        model_context_limit: ctx_window,
1362                        current_context_tokens: Some(current_tokens),
1363                    };
1364                    let rlm_config = RlmConfig::default();
1365                    let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
1366                    if routing.should_route {
1367                        tracing::info!(
1368                            tool = %tool_name,
1369                            reason = %routing.reason,
1370                            estimated_tokens = routing.estimated_tokens,
1371                            "RLM: Routing large tool output"
1372                        );
1373                        let auto_ctx = AutoProcessContext {
1374                            tool_id: &tool_name,
1375                            tool_args: tool_input.clone(),
1376                            session_id: &self.id,
1377                            abort: None,
1378                            on_progress: None,
1379                            provider: Arc::clone(&provider),
1380                            model: model.clone(),
1381                        };
1382                        match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
1383                            Ok(result) => {
1384                                tracing::info!(
1385                                    input_tokens = result.stats.input_tokens,
1386                                    output_tokens = result.stats.output_tokens,
1387                                    iterations = result.stats.iterations,
1388                                    "RLM: Processing complete"
1389                                );
1390                                result.processed
1391                            }
1392                            Err(e) => {
1393                                tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1394                                let (truncated, _, _) = RlmRouter::smart_truncate(
1395                                    &content,
1396                                    &tool_name,
1397                                    &tool_input,
1398                                    ctx_window / 4,
1399                                );
1400                                truncated
1401                            }
1402                        }
1403                    } else {
1404                        content
1405                    }
1406                };
1407
1408                self.add_message(Message {
1409                    role: Role::Tool,
1410                    content: vec![ContentPart::ToolResult {
1411                        tool_call_id: tool_id,
1412                        content,
1413                    }],
1414                });
1415            }
1416        }
1417
1418        self.save().await?;
1419
1420        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
1421        self.archive_event_stream_to_s3().await;
1422
1423        // Text was already sent per-step via TextComplete events.
1424        // Send updated session state so the caller can sync back.
1425        let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
1426        let _ = event_tx.send(SessionEvent::Done).await;
1427
1428        Ok(SessionResult {
1429            text: final_output.trim().to_string(),
1430            session_id: self.id.clone(),
1431        })
1432    }
1433
1434    /// Generate a title for the session based on the first message
1435    /// Only sets title if not already set (for initial title generation)
1436    pub async fn generate_title(&mut self) -> Result<()> {
1437        if self.title.is_some() {
1438            return Ok(());
1439        }
1440
1441        // Get first user message
1442        let first_message = self
1443            .messages
1444            .iter()
1445            .find(|m| m.role == crate::provider::Role::User);
1446
1447        if let Some(msg) = first_message {
1448            let text: String = msg
1449                .content
1450                .iter()
1451                .filter_map(|p| match p {
1452                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1453                    _ => None,
1454                })
1455                .collect::<Vec<_>>()
1456                .join(" ");
1457
1458            // Truncate to reasonable length
1459            self.title = Some(truncate_with_ellipsis(&text, 47));
1460        }
1461
1462        Ok(())
1463    }
1464
1465    /// Regenerate the title based on the first message, even if already set
1466    /// Use this for on-demand title updates or after context changes
1467    pub async fn regenerate_title(&mut self) -> Result<()> {
1468        // Get first user message
1469        let first_message = self
1470            .messages
1471            .iter()
1472            .find(|m| m.role == crate::provider::Role::User);
1473
1474        if let Some(msg) = first_message {
1475            let text: String = msg
1476                .content
1477                .iter()
1478                .filter_map(|p| match p {
1479                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1480                    _ => None,
1481                })
1482                .collect::<Vec<_>>()
1483                .join(" ");
1484
1485            // Truncate to reasonable length
1486            self.title = Some(truncate_with_ellipsis(&text, 47));
1487        }
1488
1489        Ok(())
1490    }
1491
1492    /// Set a custom title for the session
1493    pub fn set_title(&mut self, title: impl Into<String>) {
1494        self.title = Some(title.into());
1495        self.updated_at = Utc::now();
1496    }
1497
1498    /// Clear the title, allowing it to be regenerated
1499    pub fn clear_title(&mut self) {
1500        self.title = None;
1501        self.updated_at = Utc::now();
1502    }
1503
1504    /// Handle context change - updates metadata and optionally regenerates title
1505    /// Call this when the session context changes (e.g., directory change, model change)
1506    pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1507        self.updated_at = Utc::now();
1508
1509        if regenerate_title {
1510            self.regenerate_title().await?;
1511        }
1512
1513        Ok(())
1514    }
1515
1516    /// Delete a session by ID
1517    pub async fn delete(id: &str) -> Result<()> {
1518        let path = Self::session_path(id)?;
1519        if path.exists() {
1520            tokio::fs::remove_file(&path).await?;
1521        }
1522        Ok(())
1523    }
1524
1525    /// Get the sessions directory
1526    fn sessions_dir() -> Result<PathBuf> {
1527        crate::config::Config::data_dir()
1528            .map(|d| d.join("sessions"))
1529            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1530    }
1531
1532    /// Get the path for a session file
1533    fn session_path(id: &str) -> Result<PathBuf> {
1534        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1535    }
1536
1537    /// Get the event stream path from environment, if configured.
1538    /// Returns None if CODETETHER_EVENT_STREAM_PATH is not set.
1539    fn event_stream_path() -> Option<PathBuf> {
1540        std::env::var("CODETETHER_EVENT_STREAM_PATH")
1541            .ok()
1542            .map(PathBuf::from)
1543    }
1544}
1545
1546/// Result from a session prompt
1547#[derive(Debug, Clone, Serialize, Deserialize)]
1548pub struct SessionResult {
1549    pub text: String,
1550    pub session_id: String,
1551}
1552
1553/// Events emitted during session processing for real-time UI updates
1554#[derive(Debug, Clone)]
1555pub enum SessionEvent {
1556    /// Agent is thinking/processing
1557    Thinking,
1558    /// Tool call started
1559    ToolCallStart { name: String, arguments: String },
1560    /// Tool call completed with result
1561    ToolCallComplete {
1562        name: String,
1563        output: String,
1564        success: bool,
1565    },
1566    /// Partial text output (for streaming)
1567    TextChunk(String),
1568    /// Final text output
1569    TextComplete(String),
1570    /// Model thinking/reasoning output
1571    ThinkingComplete(String),
1572    /// Token usage for one LLM round-trip
1573    UsageReport {
1574        prompt_tokens: usize,
1575        completion_tokens: usize,
1576        duration_ms: u64,
1577        model: String,
1578    },
1579    /// Updated session state for caller to sync back
1580    SessionSync(Session),
1581    /// Processing complete
1582    Done,
1583    /// Error occurred
1584    Error(String),
1585}
1586
1587/// List all sessions
1588pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1589    let sessions_dir = crate::config::Config::data_dir()
1590        .map(|d| d.join("sessions"))
1591        .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1592
1593    if !sessions_dir.exists() {
1594        return Ok(Vec::new());
1595    }
1596
1597    let mut summaries = Vec::new();
1598    let mut entries = fs::read_dir(&sessions_dir).await?;
1599
1600    while let Some(entry) = entries.next_entry().await? {
1601        let path = entry.path();
1602        if path.extension().map(|e| e == "json").unwrap_or(false) {
1603            if let Ok(content) = fs::read_to_string(&path).await {
1604                if let Ok(session) = serde_json::from_str::<Session>(&content) {
1605                    summaries.push(SessionSummary {
1606                        id: session.id,
1607                        title: session.title,
1608                        created_at: session.created_at,
1609                        updated_at: session.updated_at,
1610                        message_count: session.messages.len(),
1611                        agent: session.agent,
1612                        directory: session.metadata.directory,
1613                    });
1614                }
1615            }
1616        }
1617    }
1618
1619    summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1620    Ok(summaries)
1621}
1622
1623/// List sessions scoped to a specific directory (workspace)
1624///
1625/// Only returns sessions whose `metadata.directory` matches the given path.
1626/// This prevents sessions from other workspaces "leaking" into the TUI.
1627pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1628    let all = list_sessions().await?;
1629    let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1630    Ok(all
1631        .into_iter()
1632        .filter(|s| {
1633            s.directory
1634                .as_ref()
1635                .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1636                .unwrap_or(false)
1637        })
1638        .collect())
1639}
1640
1641/// List sessions for a directory with pagination.
1642///
1643/// - `limit`: Maximum number of sessions to return (default: 100)
1644/// - `offset`: Number of sessions to skip (default: 0)
1645pub async fn list_sessions_paged(
1646    dir: &std::path::Path,
1647    limit: usize,
1648    offset: usize,
1649) -> Result<Vec<SessionSummary>> {
1650    let mut sessions = list_sessions_for_directory(dir).await?;
1651    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1652    Ok(sessions.into_iter().skip(offset).take(limit).collect())
1653}
1654
1655/// Summary of a session for listing
1656#[derive(Debug, Clone, Serialize, Deserialize)]
1657pub struct SessionSummary {
1658    pub id: String,
1659    pub title: Option<String>,
1660    pub created_at: DateTime<Utc>,
1661    pub updated_at: DateTime<Utc>,
1662    pub message_count: usize,
1663    pub agent: String,
1664    /// The working directory this session was created in
1665    #[serde(default)]
1666    pub directory: Option<PathBuf>,
1667}
1668
1669fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1670    if max_chars == 0 {
1671        return String::new();
1672    }
1673
1674    let mut chars = value.chars();
1675    let mut output = String::new();
1676    for _ in 0..max_chars {
1677        if let Some(ch) = chars.next() {
1678            output.push(ch);
1679        } else {
1680            return value.to_string();
1681        }
1682    }
1683
1684    if chars.next().is_some() {
1685        format!("{output}...")
1686    } else {
1687        output
1688    }
1689}
1690
1691// Async helper for Vec - kept for potential future use
1692#[allow(dead_code)]
1693use futures::StreamExt;
1694
1695#[allow(dead_code)]
1696trait AsyncCollect<T> {
1697    async fn collect(self) -> Vec<T>;
1698}
1699
1700#[allow(dead_code)]
1701impl<S, T> AsyncCollect<T> for S
1702where
1703    S: futures::Stream<Item = T> + Unpin,
1704{
1705    async fn collect(mut self) -> Vec<T> {
1706        let mut items = Vec::new();
1707        while let Some(item) = self.next().await {
1708            items.push(item);
1709        }
1710        items
1711    }
1712}