Skip to main content

codetether_agent/session/
mod.rs

1//! Session management
2//!
3//! Sessions track the conversation history and state for agent interactions.
4
5use crate::agent::ToolUse;
6use crate::audit::{AuditCategory, AuditOutcome, try_audit_log};
7use crate::event_stream::ChatEvent;
8use crate::event_stream::s3_sink::S3Sink;
9use crate::provider::{Message, Usage};
10use crate::rlm::{RlmChunker, RlmConfig, RlmRouter, RoutingContext};
11use crate::rlm::router::AutoProcessContext;
12use crate::tool::ToolRegistry;
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tokio::fs;
20use uuid::Uuid;
21
22#[cfg(feature = "functiongemma")]
23use crate::cognition::tool_router::{ToolCallRouter, ToolRouterConfig};
24
25fn is_interactive_tool(tool_name: &str) -> bool {
26    matches!(tool_name, "question")
27}
28
29fn choose_default_provider<'a>(providers: &'a [&'a str]) -> Option<&'a str> {
30    // Keep Google as an explicit option, but don't default to it first because
31    // some environments expose API keys that are not valid for ChatCompletions.
32    let preferred = [
33        "zai",
34        "openai",
35        "github-copilot",
36        "anthropic",
37        "minimax",
38        "openrouter",
39        "novita",
40        "moonshotai",
41        "google",
42    ];
43    for name in preferred {
44        if let Some(found) = providers.iter().copied().find(|p| *p == name) {
45            return Some(found);
46        }
47    }
48    providers.first().copied()
49}
50
51fn prefers_temperature_one(model: &str) -> bool {
52    let normalized = model.to_ascii_lowercase();
53    normalized.contains("kimi-k2") || normalized.contains("glm-") || normalized.contains("minimax")
54}
55
56/// Return the context window size (in tokens) for known models.
57fn context_window_for_model(model: &str) -> usize {
58    let m = model.to_ascii_lowercase();
59    if m.contains("kimi-k2") {
60        256_000
61    } else if m.contains("glm-5") || m.contains("glm5") {
62        200_000
63    } else if m.contains("gpt-4o") {
64        128_000
65    } else if m.contains("gpt-5") {
66        256_000
67    } else if m.contains("claude") {
68        200_000
69    } else if m.contains("gemini") {
70        1_000_000
71    } else if m.contains("minimax") || m.contains("m2.5") {
72        256_000
73    } else if m.contains("qwen") {
74        131_072
75    } else {
76        128_000 // conservative default
77    }
78}
79
80/// A conversation session
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct Session {
83    pub id: String,
84    pub title: Option<String>,
85    pub created_at: DateTime<Utc>,
86    pub updated_at: DateTime<Utc>,
87    pub messages: Vec<Message>,
88    pub tool_uses: Vec<ToolUse>,
89    pub usage: Usage,
90    pub agent: String,
91    pub metadata: SessionMetadata,
92}
93
94#[derive(Debug, Clone, Default, Serialize, Deserialize)]
95pub struct SessionMetadata {
96    pub directory: Option<PathBuf>,
97    pub model: Option<String>,
98    pub shared: bool,
99    pub share_url: Option<String>,
100}
101
102impl Session {
103    fn default_model_for_provider(provider: &str) -> String {
104        match provider {
105            "moonshotai" => "kimi-k2.5".to_string(),
106            "anthropic" => "claude-sonnet-4-20250514".to_string(),
107            "minimax" => "MiniMax-M2.5".to_string(),
108            "openai" => "gpt-4o".to_string(),
109            "google" => "gemini-2.5-pro".to_string(),
110            "zhipuai" | "zai" => "glm-5".to_string(),
111            // OpenRouter uses model IDs like "z-ai/glm-5".
112            "openrouter" => "z-ai/glm-5".to_string(),
113            "novita" => "qwen/qwen3-coder-next".to_string(),
114            "github-copilot" | "github-copilot-enterprise" => "gpt-5-mini".to_string(),
115            _ => "glm-5".to_string(),
116        }
117    }
118
119    /// Create a new session
120    pub async fn new() -> Result<Self> {
121        let id = Uuid::new_v4().to_string();
122        let now = Utc::now();
123
124        Ok(Self {
125            id,
126            title: None,
127            created_at: now,
128            updated_at: now,
129            messages: Vec::new(),
130            tool_uses: Vec::new(),
131            usage: Usage::default(),
132            agent: "build".to_string(),
133            metadata: SessionMetadata {
134                directory: Some(std::env::current_dir()?),
135                ..Default::default()
136            },
137        })
138    }
139
140    /// Load an existing session
141    pub async fn load(id: &str) -> Result<Self> {
142        let path = Self::session_path(id)?;
143        let content = fs::read_to_string(&path).await?;
144        let session: Session = serde_json::from_str(&content)?;
145        Ok(session)
146    }
147
148    /// Load the last session, optionally scoped to a workspace directory
149    ///
150    /// When `workspace` is Some, only considers sessions created in that directory.
151    /// When None, returns the most recent session globally (legacy behavior).
152    pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
153        let sessions_dir = Self::sessions_dir()?;
154
155        if !sessions_dir.exists() {
156            anyhow::bail!("No sessions found");
157        }
158
159        let mut entries: Vec<tokio::fs::DirEntry> = Vec::new();
160        let mut read_dir = fs::read_dir(&sessions_dir).await?;
161        while let Some(entry) = read_dir.next_entry().await? {
162            entries.push(entry);
163        }
164
165        if entries.is_empty() {
166            anyhow::bail!("No sessions found");
167        }
168
169        // Sort by modification time (most recent first)
170        // Use std::fs::metadata since we can't await in sort_by_key
171        entries.sort_by_key(|e| {
172            std::cmp::Reverse(
173                std::fs::metadata(e.path())
174                    .ok()
175                    .and_then(|m| m.modified().ok())
176                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
177            )
178        });
179
180        let canonical_workspace =
181            workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
182
183        for entry in &entries {
184            let content: String = fs::read_to_string(entry.path()).await?;
185            if let Ok(session) = serde_json::from_str::<Session>(&content) {
186                // If workspace scoping requested, filter by directory
187                if let Some(ref ws) = canonical_workspace {
188                    if let Some(ref dir) = session.metadata.directory {
189                        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
190                        if &canonical_dir == ws {
191                            return Ok(session);
192                        }
193                    }
194                    continue;
195                }
196                return Ok(session);
197            }
198        }
199
200        anyhow::bail!("No sessions found")
201    }
202
203    /// Load the last session (global, unscoped — legacy compatibility)
204    pub async fn last() -> Result<Self> {
205        Self::last_for_directory(None).await
206    }
207
208    /// Save the session to disk
209    pub async fn save(&self) -> Result<()> {
210        let path = Self::session_path(&self.id)?;
211
212        if let Some(parent) = path.parent() {
213            fs::create_dir_all(parent).await?;
214        }
215
216        let content = serde_json::to_string_pretty(self)?;
217        fs::write(&path, content).await?;
218
219        Ok(())
220    }
221
222    /// Add a message to the session
223    pub fn add_message(&mut self, message: Message) {
224        self.messages.push(message);
225        self.updated_at = Utc::now();
226    }
227
228    /// Execute a prompt and get the result
229    pub async fn prompt(&mut self, message: &str) -> Result<SessionResult> {
230        use crate::provider::{
231            CompletionRequest, ContentPart, ProviderRegistry, Role, parse_model_string,
232        };
233
234        // Load providers from Vault
235        let registry = ProviderRegistry::from_vault().await?;
236
237        let providers = registry.list();
238        if providers.is_empty() {
239            anyhow::bail!(
240                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
241            );
242        }
243
244        tracing::info!("Available providers: {:?}", providers);
245
246        // Parse model string (format: "provider/model", "provider", or just "model")
247        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
248            let (prov, model) = parse_model_string(model_str);
249            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
250            if prov.is_some() {
251                // Format: provider/model
252                (prov.map(|s| s.to_string()), model.to_string())
253            } else if providers.contains(&model) {
254                // Format: just provider name (e.g., "novita")
255                (Some(model.to_string()), String::new())
256            } else {
257                // Format: just model name
258                (None, model.to_string())
259            }
260        } else {
261            (None, String::new())
262        };
263
264        // Determine which provider to use with deterministic fallback ordering.
265        let selected_provider = provider_name
266            .as_deref()
267            .filter(|p| providers.contains(p))
268            .or_else(|| choose_default_provider(providers.as_slice()))
269            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
270
271        let provider = registry
272            .get(selected_provider)
273            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
274
275        // Add user message to session using add_message
276        self.add_message(Message {
277            role: Role::User,
278            content: vec![ContentPart::Text {
279                text: message.to_string(),
280            }],
281        });
282
283        // Generate title if this is the first user message and no title exists
284        if self.title.is_none() {
285            self.generate_title().await?;
286        }
287
288        // Determine model to use
289        let model = if !model_id.is_empty() {
290            model_id
291        } else {
292            Self::default_model_for_provider(selected_provider)
293        };
294
295        // Compress oversized user message via RLM if it exceeds the context threshold
296        {
297            let ctx_window = context_window_for_model(&model);
298            let msg_tokens = RlmChunker::estimate_tokens(message);
299            let threshold = (ctx_window as f64 * 0.35) as usize;
300            if msg_tokens > threshold {
301                tracing::info!(
302                    msg_tokens,
303                    threshold,
304                    ctx_window,
305                    "RLM: User message exceeds context threshold, compressing"
306                );
307                let auto_ctx = AutoProcessContext {
308                    tool_id: "session_context",
309                    tool_args: serde_json::json!({}),
310                    session_id: &self.id,
311                    abort: None,
312                    on_progress: None,
313                    provider: Arc::clone(&provider),
314                    model: model.clone(),
315                };
316                let rlm_config = RlmConfig::default();
317                match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
318                    Ok(result) => {
319                        tracing::info!(
320                            input_tokens = result.stats.input_tokens,
321                            output_tokens = result.stats.output_tokens,
322                            "RLM: User message compressed"
323                        );
324                        // Replace the last message (user message we just added)
325                        if let Some(last) = self.messages.last_mut() {
326                            last.content = vec![ContentPart::Text {
327                                text: format!(
328                                    "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
329                                    msg_tokens,
330                                    result.processed,
331                                    message.chars().take(500).collect::<String>()
332                                ),
333                            }];
334                        }
335                    }
336                    Err(e) => {
337                        tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
338                        let max_chars = threshold * 4;
339                        let truncated = RlmChunker::compress(message, max_chars / 4, None);
340                        if let Some(last) = self.messages.last_mut() {
341                            last.content = vec![ContentPart::Text {
342                                text: truncated,
343                            }];
344                        }
345                    }
346                }
347            }
348        }
349
350        // Create tool registry with all available tools
351        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
352        let tool_definitions: Vec<_> = tool_registry
353            .definitions()
354            .into_iter()
355            .filter(|tool| !is_interactive_tool(&tool.name))
356            .collect();
357
358        // Some models behave best with temperature=1.0.
359        // - Kimi K2.x requires temperature=1.0
360        // - GLM (Z.AI) defaults to temperature 1.0 for coding workflows
361        // Use contains() to match both short aliases and provider-qualified IDs.
362        let temperature = if prefers_temperature_one(&model) {
363            Some(1.0)
364        } else {
365            Some(0.7)
366        };
367
368        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
369        tracing::info!("Available tools: {}", tool_definitions.len());
370
371        // All current providers support native tool calling.  Hardcode to
372        // true so we skip the expensive list_models() API call on every message.
373        #[cfg(feature = "functiongemma")]
374        let model_supports_tools = true;
375
376        // Build system prompt with AGENTS.md
377        let cwd = self
378            .metadata
379            .directory
380            .clone()
381            .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
382        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
383
384        // Run agentic loop with tool execution
385        let max_steps = 50;
386        let mut final_output = String::new();
387
388        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
389        #[cfg(feature = "functiongemma")]
390        let tool_router: Option<ToolCallRouter> = {
391            let cfg = ToolRouterConfig::from_env();
392            match ToolCallRouter::from_config(&cfg) {
393                Ok(r) => r,
394                Err(e) => {
395                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
396                    None
397                }
398            }
399        };
400
401        for step in 1..=max_steps {
402            tracing::info!(step = step, "Agent step starting");
403
404            // Build messages with system prompt first
405            let mut messages = vec![Message {
406                role: Role::System,
407                content: vec![ContentPart::Text {
408                    text: system_prompt.clone(),
409                }],
410            }];
411            messages.extend(self.messages.clone());
412
413            // Create completion request with tools
414            let request = CompletionRequest {
415                messages,
416                tools: tool_definitions.clone(),
417                model: model.clone(),
418                temperature,
419                top_p: None,
420                max_tokens: Some(8192),
421                stop: Vec::new(),
422            };
423
424            // Call the provider
425            let response = provider.complete(request).await?;
426
427            // Optionally route text-only responses through FunctionGemma to
428            // produce structured tool calls.  Skipped when the model natively
429            // supports tool calling (which all current providers do).
430            #[cfg(feature = "functiongemma")]
431            let response = if let Some(ref router) = tool_router {
432                router
433                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
434                    .await
435            } else {
436                response
437            };
438
439            // Record token usage
440            crate::telemetry::TOKEN_USAGE.record_model_usage(
441                &model,
442                response.usage.prompt_tokens as u64,
443                response.usage.completion_tokens as u64,
444            );
445
446            // Extract tool calls from response
447            let tool_calls: Vec<(String, String, serde_json::Value)> = response
448                .message
449                .content
450                .iter()
451                .filter_map(|part| {
452                    if let ContentPart::ToolCall {
453                        id,
454                        name,
455                        arguments,
456                    } = part
457                    {
458                        // Parse arguments JSON string into Value
459                        let args: serde_json::Value =
460                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
461                        Some((id.clone(), name.clone(), args))
462                    } else {
463                        None
464                    }
465                })
466                .collect();
467
468            // Collect text output
469            for part in &response.message.content {
470                if let ContentPart::Text { text } = part {
471                    if !text.is_empty() {
472                        final_output.push_str(text);
473                        final_output.push('\n');
474                    }
475                }
476            }
477
478            // If no tool calls, we're done
479            if tool_calls.is_empty() {
480                self.add_message(response.message.clone());
481                break;
482            }
483
484            // Add assistant message with tool calls
485            self.add_message(response.message.clone());
486
487            tracing::info!(
488                step = step,
489                num_tools = tool_calls.len(),
490                "Executing tool calls"
491            );
492
493            // Execute each tool call
494            for (tool_id, tool_name, tool_input) in tool_calls {
495                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
496
497                if is_interactive_tool(&tool_name) {
498                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
499                    self.add_message(Message {
500                        role: Role::Tool,
501                        content: vec![ContentPart::ToolResult {
502                            tool_call_id: tool_id,
503                            content: "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string(),
504                        }],
505                    });
506                    continue;
507                }
508
509                // Get and execute the tool
510                let exec_start = std::time::Instant::now();
511                let content = if let Some(tool) = tool_registry.get(&tool_name) {
512                    match tool.execute(tool_input.clone()).await {
513                        Ok(result) => {
514                            let duration_ms = exec_start.elapsed().as_millis() as u64;
515                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
516                            if let Some(audit) = try_audit_log() {
517                                audit.log_with_correlation(
518                                        AuditCategory::ToolExecution,
519                                        format!("tool:{}", tool_name),
520                                        if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
521                                        None,
522                                        Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
523                                        None,  // okr_id
524                                        None,  // okr_run_id
525                                        None,  // relay_id
526                                        Some(self.id.clone()),  // session_id
527                                    ).await;
528                            }
529                            result.output
530                        }
531                        Err(e) => {
532                            let duration_ms = exec_start.elapsed().as_millis() as u64;
533                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
534                            if let Some(audit) = try_audit_log() {
535                                audit.log_with_correlation(
536                                        AuditCategory::ToolExecution,
537                                        format!("tool:{}", tool_name),
538                                        AuditOutcome::Failure,
539                                        None,
540                                        Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
541                                        None,  // okr_id
542                                        None,  // okr_run_id
543                                        None,  // relay_id
544                                        Some(self.id.clone()),  // session_id
545                                    ).await;
546                            }
547                            format!("Error: {}", e)
548                        }
549                    }
550                } else {
551                    tracing::warn!(tool = %tool_name, "Tool not found");
552                    if let Some(audit) = try_audit_log() {
553                        audit
554                            .log_with_correlation(
555                                AuditCategory::ToolExecution,
556                                format!("tool:{}", tool_name),
557                                AuditOutcome::Failure,
558                                None,
559                                Some(json!({ "error": "unknown_tool" })),
560                                None,                  // okr_id
561                                None,                  // okr_run_id
562                                None,                  // relay_id
563                                Some(self.id.clone()), // session_id
564                            )
565                            .await;
566                    }
567                    format!("Error: Unknown tool '{}'", tool_name)
568                };
569
570                // Calculate duration for event stream
571                let duration_ms = exec_start.elapsed().as_millis() as u64;
572                let success = !content.starts_with("Error:");
573
574                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
575                if let Some(base_dir) = Self::event_stream_path() {
576                    let workspace = std::env::var("PWD")
577                        .map(PathBuf::from)
578                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
579                    let event = ChatEvent::tool_result(
580                        workspace,
581                        self.id.clone(),
582                        &tool_name,
583                        success,
584                        duration_ms,
585                        &content,
586                        self.messages.len() as u64,
587                    );
588                    let event_json = event.to_json();
589                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
590                    let seq = self.messages.len() as u64;
591                    let filename = format!(
592                        "{}-chat-events-{:020}-{:020}.jsonl",
593                        timestamp,
594                        seq * 10000,
595                        (seq + 1) * 10000
596                    );
597                    let event_path = base_dir.join(&self.id).join(filename);
598
599                    let event_path_clone = event_path;
600                    tokio::spawn(async move {
601                        if let Some(parent) = event_path_clone.parent() {
602                            let _ = tokio::fs::create_dir_all(parent).await;
603                        }
604                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
605                            .create(true)
606                            .append(true)
607                            .open(&event_path_clone)
608                            .await
609                        {
610                            use tokio::io::AsyncWriteExt;
611                            let _ = file.write_all(event_json.as_bytes()).await;
612                            let _ = file.write_all(b"\n").await;
613                        }
614                    });
615                }
616
617                // Route large tool outputs through RLM
618                let content = {
619                    let ctx_window = context_window_for_model(&model);
620                    let total_chars: usize = self.messages.iter().map(|m| m.content.iter().map(|p| match p {
621                        ContentPart::Text { text } => text.len(),
622                        ContentPart::ToolResult { content, .. } => content.len(),
623                        _ => 0,
624                    }).sum::<usize>()).sum();
625                    let current_tokens = total_chars / 4; // ~4 chars per token
626                    let routing_ctx = RoutingContext {
627                        tool_id: tool_name.clone(),
628                        session_id: self.id.clone(),
629                        call_id: Some(tool_id.clone()),
630                        model_context_limit: ctx_window,
631                        current_context_tokens: Some(current_tokens),
632                    };
633                    let rlm_config = RlmConfig::default();
634                    let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
635                    if routing.should_route {
636                        tracing::info!(
637                            tool = %tool_name,
638                            reason = %routing.reason,
639                            estimated_tokens = routing.estimated_tokens,
640                            "RLM: Routing large tool output"
641                        );
642                        let auto_ctx = AutoProcessContext {
643                            tool_id: &tool_name,
644                            tool_args: tool_input.clone(),
645                            session_id: &self.id,
646                            abort: None,
647                            on_progress: None,
648                            provider: Arc::clone(&provider),
649                            model: model.clone(),
650                        };
651                        match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
652                            Ok(result) => {
653                                tracing::info!(
654                                    input_tokens = result.stats.input_tokens,
655                                    output_tokens = result.stats.output_tokens,
656                                    iterations = result.stats.iterations,
657                                    "RLM: Processing complete"
658                                );
659                                result.processed
660                            }
661                            Err(e) => {
662                                tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
663                                let (truncated, _, _) = RlmRouter::smart_truncate(
664                                    &content, &tool_name, &tool_input, ctx_window / 4,
665                                );
666                                truncated
667                            }
668                        }
669                    } else {
670                        content
671                    }
672                };
673
674                // Add tool result message
675                self.add_message(Message {
676                    role: Role::Tool,
677                    content: vec![ContentPart::ToolResult {
678                        tool_call_id: tool_id,
679                        content,
680                    }],
681                });
682            }
683        }
684
685        // Save session after each prompt to persist messages
686        self.save().await?;
687
688        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
689        self.archive_event_stream_to_s3().await;
690
691        Ok(SessionResult {
692            text: final_output.trim().to_string(),
693            session_id: self.id.clone(),
694        })
695    }
696
697    /// Archive event stream files to S3/R2 for immutable compliance logging
698    async fn archive_event_stream_to_s3(&self) {
699        // Check if S3 is configured
700        if !S3Sink::is_configured() {
701            return;
702        }
703
704        let Some(base_dir) = Self::event_stream_path() else {
705            return;
706        };
707
708        let session_event_dir = base_dir.join(&self.id);
709        if !session_event_dir.exists() {
710            return;
711        }
712
713        // Try to create S3 sink
714        let Ok(sink) = S3Sink::from_env().await else {
715            tracing::warn!("Failed to create S3 sink for archival");
716            return;
717        };
718
719        // Upload all event files in the session directory
720        let session_id = self.id.clone();
721        tokio::spawn(async move {
722            if let Ok(mut entries) = tokio::fs::read_dir(&session_event_dir).await {
723                while let Ok(Some(entry)) = entries.next_entry().await {
724                    let path = entry.path();
725                    if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
726                        match sink.upload_file(&path, &session_id).await {
727                            Ok(url) => {
728                                tracing::info!(url = %url, "Archived event stream to S3/R2");
729                            }
730                            Err(e) => {
731                                tracing::warn!(error = %e, "Failed to archive event file to S3");
732                            }
733                        }
734                    }
735                }
736            }
737        });
738    }
739
740    /// Process a user message with real-time event streaming for UI updates.
741    /// Events are sent through the provided channel as tool calls execute.
742    ///
743    /// Accepts a pre-loaded `ProviderRegistry` to avoid re-fetching secrets
744    /// from Vault on every message (which was the primary TUI performance
745    /// bottleneck).
746    pub async fn prompt_with_events(
747        &mut self,
748        message: &str,
749        event_tx: tokio::sync::mpsc::Sender<SessionEvent>,
750        registry: std::sync::Arc<crate::provider::ProviderRegistry>,
751    ) -> Result<SessionResult> {
752        use crate::provider::{CompletionRequest, ContentPart, Role, parse_model_string};
753
754        let _ = event_tx.send(SessionEvent::Thinking).await;
755
756        let providers = registry.list();
757        if providers.is_empty() {
758            anyhow::bail!(
759                "No providers available. Configure API keys in HashiCorp Vault (for Copilot use `codetether auth copilot`)."
760            );
761        }
762        tracing::info!("Available providers: {:?}", providers);
763
764        // Parse model string (format: "provider/model", "provider", or just "model")
765        let (provider_name, model_id) = if let Some(ref model_str) = self.metadata.model {
766            let (prov, model) = parse_model_string(model_str);
767            let prov = prov.map(|p| if p == "zhipuai" { "zai" } else { p });
768            if prov.is_some() {
769                (prov.map(|s| s.to_string()), model.to_string())
770            } else if providers.contains(&model) {
771                (Some(model.to_string()), String::new())
772            } else {
773                (None, model.to_string())
774            }
775        } else {
776            (None, String::new())
777        };
778
779        // Determine which provider to use with deterministic fallback ordering.
780        let selected_provider = provider_name
781            .as_deref()
782            .filter(|p| providers.contains(p))
783            .or_else(|| choose_default_provider(providers.as_slice()))
784            .ok_or_else(|| anyhow::anyhow!("No providers available"))?;
785
786        let provider = registry
787            .get(selected_provider)
788            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", selected_provider))?;
789
790        // Add user message
791        self.add_message(Message {
792            role: Role::User,
793            content: vec![ContentPart::Text {
794                text: message.to_string(),
795            }],
796        });
797
798        // Generate title if needed
799        if self.title.is_none() {
800            self.generate_title().await?;
801        }
802
803        // Determine model
804        let model = if !model_id.is_empty() {
805            model_id
806        } else {
807            Self::default_model_for_provider(selected_provider)
808        };
809
810        // Compress oversized user message via RLM if it exceeds the context threshold
811        {
812            let ctx_window = context_window_for_model(&model);
813            let msg_tokens = RlmChunker::estimate_tokens(message);
814            let threshold = (ctx_window as f64 * 0.35) as usize;
815            if msg_tokens > threshold {
816                tracing::info!(
817                    msg_tokens,
818                    threshold,
819                    ctx_window,
820                    "RLM: User message exceeds context threshold, compressing"
821                );
822                let auto_ctx = AutoProcessContext {
823                    tool_id: "session_context",
824                    tool_args: serde_json::json!({}),
825                    session_id: &self.id,
826                    abort: None,
827                    on_progress: None,
828                    provider: Arc::clone(&provider),
829                    model: model.clone(),
830                };
831                let rlm_config = RlmConfig::default();
832                match RlmRouter::auto_process(message, auto_ctx, &rlm_config).await {
833                    Ok(result) => {
834                        tracing::info!(
835                            input_tokens = result.stats.input_tokens,
836                            output_tokens = result.stats.output_tokens,
837                            "RLM: User message compressed"
838                        );
839                        if let Some(last) = self.messages.last_mut() {
840                            last.content = vec![ContentPart::Text {
841                                text: format!(
842                                    "[Original message: {} tokens, compressed via RLM]\n\n{}\n\n---\nOriginal request prefix:\n{}",
843                                    msg_tokens,
844                                    result.processed,
845                                    message.chars().take(500).collect::<String>()
846                                ),
847                            }];
848                        }
849                    }
850                    Err(e) => {
851                        tracing::warn!(error = %e, "RLM: Failed to compress user message, using truncation");
852                        let max_chars = threshold * 4;
853                        let truncated = RlmChunker::compress(message, max_chars / 4, None);
854                        if let Some(last) = self.messages.last_mut() {
855                            last.content = vec![ContentPart::Text {
856                                text: truncated,
857                            }];
858                        }
859                    }
860                }
861            }
862        }
863
864        // Create tool registry
865        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
866        let tool_definitions: Vec<_> = tool_registry
867            .definitions()
868            .into_iter()
869            .filter(|tool| !is_interactive_tool(&tool.name))
870            .collect();
871
872        let temperature = if prefers_temperature_one(&model) {
873            Some(1.0)
874        } else {
875            Some(0.7)
876        };
877
878        tracing::info!("Using model: {} via provider: {}", model, selected_provider);
879        tracing::info!("Available tools: {}", tool_definitions.len());
880
881        // All current providers support native tool calling.  Hardcode to
882        // true so we skip the expensive list_models() API call on every message.
883        #[cfg(feature = "functiongemma")]
884        let model_supports_tools = true;
885
886        // Build system prompt
887        let cwd = std::env::var("PWD")
888            .map(std::path::PathBuf::from)
889            .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
890        let system_prompt = crate::agent::builtin::build_system_prompt(&cwd);
891
892        let mut final_output = String::new();
893        let max_steps = 50;
894
895        // Initialise the FunctionGemma tool-call router (feature-gated, opt-in).
896        #[cfg(feature = "functiongemma")]
897        let tool_router: Option<ToolCallRouter> = {
898            let cfg = ToolRouterConfig::from_env();
899            match ToolCallRouter::from_config(&cfg) {
900                Ok(r) => r,
901                Err(e) => {
902                    tracing::warn!(error = %e, "FunctionGemma tool router init failed; disabled");
903                    None
904                }
905            }
906        };
907
908        for step in 1..=max_steps {
909            tracing::info!(step = step, "Agent step starting");
910            let _ = event_tx.send(SessionEvent::Thinking).await;
911
912            // Build messages with system prompt first
913            let mut messages = vec![Message {
914                role: Role::System,
915                content: vec![ContentPart::Text {
916                    text: system_prompt.clone(),
917                }],
918            }];
919            messages.extend(self.messages.clone());
920
921            let request = CompletionRequest {
922                messages,
923                tools: tool_definitions.clone(),
924                model: model.clone(),
925                temperature,
926                top_p: None,
927                max_tokens: Some(8192),
928                stop: Vec::new(),
929            };
930
931            let llm_start = std::time::Instant::now();
932            let response = provider.complete(request).await?;
933            let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
934
935            // Optionally route text-only responses through FunctionGemma to
936            // produce structured tool calls.  Skipped for native tool-calling models.
937            #[cfg(feature = "functiongemma")]
938            let response = if let Some(ref router) = tool_router {
939                router
940                    .maybe_reformat(response, &tool_definitions, model_supports_tools)
941                    .await
942            } else {
943                response
944            };
945
946            crate::telemetry::TOKEN_USAGE.record_model_usage(
947                &model,
948                response.usage.prompt_tokens as u64,
949                response.usage.completion_tokens as u64,
950            );
951
952            // Emit usage report for TUI display
953            let _ = event_tx
954                .send(SessionEvent::UsageReport {
955                    prompt_tokens: response.usage.prompt_tokens,
956                    completion_tokens: response.usage.completion_tokens,
957                    duration_ms: llm_duration_ms,
958                    model: model.clone(),
959                })
960                .await;
961
962            // Extract tool calls
963            let tool_calls: Vec<(String, String, serde_json::Value)> = response
964                .message
965                .content
966                .iter()
967                .filter_map(|part| {
968                    if let ContentPart::ToolCall {
969                        id,
970                        name,
971                        arguments,
972                    } = part
973                    {
974                        let args: serde_json::Value =
975                            serde_json::from_str(arguments).unwrap_or(serde_json::json!({}));
976                        Some((id.clone(), name.clone(), args))
977                    } else {
978                        None
979                    }
980                })
981                .collect();
982
983            // Collect text output for this step
984            // Collect thinking and text output
985            let mut thinking_text = String::new();
986            let mut step_text = String::new();
987            for part in &response.message.content {
988                match part {
989                    ContentPart::Thinking { text } => {
990                        if !text.is_empty() {
991                            thinking_text.push_str(text);
992                            thinking_text.push('\n');
993                        }
994                    }
995                    ContentPart::Text { text } => {
996                        if !text.is_empty() {
997                            step_text.push_str(text);
998                            step_text.push('\n');
999                        }
1000                    }
1001                    _ => {}
1002                }
1003            }
1004
1005            // Emit thinking output first
1006            if !thinking_text.trim().is_empty() {
1007                let _ = event_tx
1008                    .send(SessionEvent::ThinkingComplete(
1009                        thinking_text.trim().to_string(),
1010                    ))
1011                    .await;
1012            }
1013
1014            // Emit this step's text BEFORE tool calls so it appears in correct
1015            // chronological order in the TUI chat display.
1016            if !step_text.trim().is_empty() {
1017                let trimmed = step_text.trim().to_string();
1018                let _ = event_tx
1019                    .send(SessionEvent::TextChunk(trimmed.clone()))
1020                    .await;
1021                let _ = event_tx.send(SessionEvent::TextComplete(trimmed)).await;
1022                final_output.push_str(&step_text);
1023            }
1024
1025            if tool_calls.is_empty() {
1026                self.add_message(response.message.clone());
1027                break;
1028            }
1029
1030            self.add_message(response.message.clone());
1031
1032            tracing::info!(
1033                step = step,
1034                num_tools = tool_calls.len(),
1035                "Executing tool calls"
1036            );
1037
1038            // Execute each tool call with events
1039            for (tool_id, tool_name, tool_input) in tool_calls {
1040                let args_str = serde_json::to_string(&tool_input).unwrap_or_default();
1041                let _ = event_tx
1042                    .send(SessionEvent::ToolCallStart {
1043                        name: tool_name.clone(),
1044                        arguments: args_str,
1045                    })
1046                    .await;
1047
1048                tracing::info!(tool = %tool_name, tool_id = %tool_id, "Executing tool");
1049
1050                if is_interactive_tool(&tool_name) {
1051                    tracing::warn!(tool = %tool_name, "Blocking interactive tool in session loop");
1052                    let content = "Error: Interactive tool 'question' is disabled in this interface. Ask the user directly in assistant text.".to_string();
1053                    let _ = event_tx
1054                        .send(SessionEvent::ToolCallComplete {
1055                            name: tool_name.clone(),
1056                            output: content.clone(),
1057                            success: false,
1058                        })
1059                        .await;
1060                    self.add_message(Message {
1061                        role: Role::Tool,
1062                        content: vec![ContentPart::ToolResult {
1063                            tool_call_id: tool_id,
1064                            content,
1065                        }],
1066                    });
1067                    continue;
1068                }
1069
1070                let exec_start = std::time::Instant::now();
1071                let (content, success) = if let Some(tool) = tool_registry.get(&tool_name) {
1072                    match tool.execute(tool_input.clone()).await {
1073                        Ok(result) => {
1074                            let duration_ms = exec_start.elapsed().as_millis() as u64;
1075                            tracing::info!(tool = %tool_name, success = result.success, "Tool execution completed");
1076                            if let Some(audit) = try_audit_log() {
1077                                audit.log_with_correlation(
1078                                        AuditCategory::ToolExecution,
1079                                        format!("tool:{}", tool_name),
1080                                        if result.success { AuditOutcome::Success } else { AuditOutcome::Failure },
1081                                        None,
1082                                        Some(json!({ "duration_ms": duration_ms, "output_len": result.output.len() })),
1083                                        None,  // okr_id
1084                                        None,  // okr_run_id
1085                                        None,  // relay_id
1086                                        Some(self.id.clone()),  // session_id
1087                                    ).await;
1088                            }
1089                            (result.output, result.success)
1090                        }
1091                        Err(e) => {
1092                            let duration_ms = exec_start.elapsed().as_millis() as u64;
1093                            tracing::warn!(tool = %tool_name, error = %e, "Tool execution failed");
1094                            if let Some(audit) = try_audit_log() {
1095                                audit.log_with_correlation(
1096                                        AuditCategory::ToolExecution,
1097                                        format!("tool:{}", tool_name),
1098                                        AuditOutcome::Failure,
1099                                        None,
1100                                        Some(json!({ "duration_ms": duration_ms, "error": e.to_string() })),
1101                                        None,  // okr_id
1102                                        None,  // okr_run_id
1103                                        None,  // relay_id
1104                                        Some(self.id.clone()),  // session_id
1105                                    ).await;
1106                            }
1107                            (format!("Error: {}", e), false)
1108                        }
1109                    }
1110                } else {
1111                    tracing::warn!(tool = %tool_name, "Tool not found");
1112                    if let Some(audit) = try_audit_log() {
1113                        audit
1114                            .log_with_correlation(
1115                                AuditCategory::ToolExecution,
1116                                format!("tool:{}", tool_name),
1117                                AuditOutcome::Failure,
1118                                None,
1119                                Some(json!({ "error": "unknown_tool" })),
1120                                None,                  // okr_id
1121                                None,                  // okr_run_id
1122                                None,                  // relay_id
1123                                Some(self.id.clone()), // session_id
1124                            )
1125                            .await;
1126                    }
1127                    (format!("Error: Unknown tool '{}'", tool_name), false)
1128                };
1129
1130                // Calculate total duration from exec_start (captured from line 772)
1131                let duration_ms = exec_start.elapsed().as_millis() as u64;
1132
1133                // Emit event stream event for audit/compliance (SOC 2, FedRAMP, ATO)
1134                // This creates the structured JSONL record with byte-range offsets
1135                // File format: {timestamp}-chat-events-{start_byte}-{end_byte}.jsonl
1136                if let Some(base_dir) = Self::event_stream_path() {
1137                    let workspace = std::env::var("PWD")
1138                        .map(PathBuf::from)
1139                        .unwrap_or_else(|_| std::env::current_dir().unwrap_or_default());
1140                    let event = ChatEvent::tool_result(
1141                        workspace,
1142                        self.id.clone(),
1143                        &tool_name,
1144                        success,
1145                        duration_ms,
1146                        &content,
1147                        self.messages.len() as u64,
1148                    );
1149                    let event_json = event.to_json();
1150                    let event_size = event_json.len() as u64 + 1; // +1 for newline
1151
1152                    // Generate filename with byte-range offsets for random access replay
1153                    // Format: {timestamp}-chat-events-{start_offset}-{end_offset}.jsonl
1154                    // We use a session-scoped counter stored in metadata for byte tracking
1155                    let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
1156                    let seq = self.messages.len() as u64;
1157                    let filename = format!(
1158                        "{}-chat-events-{:020}-{:020}.jsonl",
1159                        timestamp,
1160                        seq * 10000,       // approximated start offset
1161                        (seq + 1) * 10000  // approximated end offset
1162                    );
1163                    let event_path = base_dir.join(&self.id).join(filename);
1164
1165                    // Fire-and-forget: don't block tool execution on event logging
1166                    let event_path_clone = event_path;
1167                    tokio::spawn(async move {
1168                        if let Some(parent) = event_path_clone.parent() {
1169                            let _ = tokio::fs::create_dir_all(parent).await;
1170                        }
1171                        if let Ok(mut file) = tokio::fs::OpenOptions::new()
1172                            .create(true)
1173                            .append(true)
1174                            .open(&event_path_clone)
1175                            .await
1176                        {
1177                            use tokio::io::AsyncWriteExt;
1178                            let _ = file.write_all(event_json.as_bytes()).await;
1179                            let _ = file.write_all(b"\n").await;
1180                            tracing::debug!(path = %event_path_clone.display(), size = event_size, "Event stream wrote");
1181                        }
1182                    });
1183                }
1184
1185                let _ = event_tx
1186                    .send(SessionEvent::ToolCallComplete {
1187                        name: tool_name.clone(),
1188                        output: content.clone(),
1189                        success,
1190                    })
1191                    .await;
1192
1193                // Route large tool outputs through RLM
1194                let content = {
1195                    let ctx_window = context_window_for_model(&model);
1196                    let total_chars: usize = self.messages.iter().map(|m| m.content.iter().map(|p| match p {
1197                        ContentPart::Text { text } => text.len(),
1198                        ContentPart::ToolResult { content, .. } => content.len(),
1199                        _ => 0,
1200                    }).sum::<usize>()).sum();
1201                    let current_tokens = total_chars / 4;
1202                    let routing_ctx = RoutingContext {
1203                        tool_id: tool_name.clone(),
1204                        session_id: self.id.clone(),
1205                        call_id: Some(tool_id.clone()),
1206                        model_context_limit: ctx_window,
1207                        current_context_tokens: Some(current_tokens),
1208                    };
1209                    let rlm_config = RlmConfig::default();
1210                    let routing = RlmRouter::should_route(&content, &routing_ctx, &rlm_config);
1211                    if routing.should_route {
1212                        tracing::info!(
1213                            tool = %tool_name,
1214                            reason = %routing.reason,
1215                            estimated_tokens = routing.estimated_tokens,
1216                            "RLM: Routing large tool output"
1217                        );
1218                        let auto_ctx = AutoProcessContext {
1219                            tool_id: &tool_name,
1220                            tool_args: tool_input.clone(),
1221                            session_id: &self.id,
1222                            abort: None,
1223                            on_progress: None,
1224                            provider: Arc::clone(&provider),
1225                            model: model.clone(),
1226                        };
1227                        match RlmRouter::auto_process(&content, auto_ctx, &rlm_config).await {
1228                            Ok(result) => {
1229                                tracing::info!(
1230                                    input_tokens = result.stats.input_tokens,
1231                                    output_tokens = result.stats.output_tokens,
1232                                    iterations = result.stats.iterations,
1233                                    "RLM: Processing complete"
1234                                );
1235                                result.processed
1236                            }
1237                            Err(e) => {
1238                                tracing::warn!(error = %e, "RLM: auto_process failed, using smart_truncate");
1239                                let (truncated, _, _) = RlmRouter::smart_truncate(
1240                                    &content, &tool_name, &tool_input, ctx_window / 4,
1241                                );
1242                                truncated
1243                            }
1244                        }
1245                    } else {
1246                        content
1247                    }
1248                };
1249
1250                self.add_message(Message {
1251                    role: Role::Tool,
1252                    content: vec![ContentPart::ToolResult {
1253                        tool_call_id: tool_id,
1254                        content,
1255                    }],
1256                });
1257            }
1258        }
1259
1260        self.save().await?;
1261
1262        // Archive event stream to S3/R2 if configured (for compliance: SOC 2, FedRAMP, ATO)
1263        self.archive_event_stream_to_s3().await;
1264
1265        // Text was already sent per-step via TextComplete events.
1266        // Send updated session state so the caller can sync back.
1267        let _ = event_tx.send(SessionEvent::SessionSync(self.clone())).await;
1268        let _ = event_tx.send(SessionEvent::Done).await;
1269
1270        Ok(SessionResult {
1271            text: final_output.trim().to_string(),
1272            session_id: self.id.clone(),
1273        })
1274    }
1275
1276    /// Generate a title for the session based on the first message
1277    /// Only sets title if not already set (for initial title generation)
1278    pub async fn generate_title(&mut self) -> Result<()> {
1279        if self.title.is_some() {
1280            return Ok(());
1281        }
1282
1283        // Get first user message
1284        let first_message = self
1285            .messages
1286            .iter()
1287            .find(|m| m.role == crate::provider::Role::User);
1288
1289        if let Some(msg) = first_message {
1290            let text: String = msg
1291                .content
1292                .iter()
1293                .filter_map(|p| match p {
1294                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1295                    _ => None,
1296                })
1297                .collect::<Vec<_>>()
1298                .join(" ");
1299
1300            // Truncate to reasonable length
1301            self.title = Some(truncate_with_ellipsis(&text, 47));
1302        }
1303
1304        Ok(())
1305    }
1306
1307    /// Regenerate the title based on the first message, even if already set
1308    /// Use this for on-demand title updates or after context changes
1309    pub async fn regenerate_title(&mut self) -> Result<()> {
1310        // Get first user message
1311        let first_message = self
1312            .messages
1313            .iter()
1314            .find(|m| m.role == crate::provider::Role::User);
1315
1316        if let Some(msg) = first_message {
1317            let text: String = msg
1318                .content
1319                .iter()
1320                .filter_map(|p| match p {
1321                    crate::provider::ContentPart::Text { text } => Some(text.clone()),
1322                    _ => None,
1323                })
1324                .collect::<Vec<_>>()
1325                .join(" ");
1326
1327            // Truncate to reasonable length
1328            self.title = Some(truncate_with_ellipsis(&text, 47));
1329        }
1330
1331        Ok(())
1332    }
1333
1334    /// Set a custom title for the session
1335    pub fn set_title(&mut self, title: impl Into<String>) {
1336        self.title = Some(title.into());
1337        self.updated_at = Utc::now();
1338    }
1339
1340    /// Clear the title, allowing it to be regenerated
1341    pub fn clear_title(&mut self) {
1342        self.title = None;
1343        self.updated_at = Utc::now();
1344    }
1345
1346    /// Handle context change - updates metadata and optionally regenerates title
1347    /// Call this when the session context changes (e.g., directory change, model change)
1348    pub async fn on_context_change(&mut self, regenerate_title: bool) -> Result<()> {
1349        self.updated_at = Utc::now();
1350
1351        if regenerate_title {
1352            self.regenerate_title().await?;
1353        }
1354
1355        Ok(())
1356    }
1357
1358    /// Import an OpenCode session into CodeTether
1359    ///
1360    /// Loads messages and parts from OpenCode storage and converts them
1361    /// into a CodeTether session that can be resumed.
1362    pub async fn from_opencode(
1363        session_id: &str,
1364        storage: &crate::opencode::OpenCodeStorage,
1365    ) -> Result<Self> {
1366        let oc_session = storage.load_session(session_id).await?;
1367        let oc_messages = storage.load_messages(session_id).await?;
1368
1369        let mut messages_with_parts = Vec::new();
1370        for msg in oc_messages {
1371            let parts = storage.load_parts(&msg.id).await?;
1372            messages_with_parts.push((msg, parts));
1373        }
1374
1375        crate::opencode::convert::to_codetether_session(&oc_session, messages_with_parts).await
1376    }
1377
1378    /// Try to load the last OpenCode session for a directory as a fallback
1379    pub async fn last_opencode_for_directory(dir: &std::path::Path) -> Result<Self> {
1380        let storage = crate::opencode::OpenCodeStorage::new()
1381            .ok_or_else(|| anyhow::anyhow!("OpenCode storage directory not found"))?;
1382
1383        if !storage.exists() {
1384            anyhow::bail!("OpenCode storage does not exist");
1385        }
1386
1387        let oc_session = storage.last_session_for_directory(dir).await?;
1388        Self::from_opencode(&oc_session.id, &storage).await
1389    }
1390
1391    /// Delete a session by ID
1392    pub async fn delete(id: &str) -> Result<()> {
1393        let path = Self::session_path(id)?;
1394        if path.exists() {
1395            tokio::fs::remove_file(&path).await?;
1396        }
1397        Ok(())
1398    }
1399
1400    /// Get the sessions directory
1401    fn sessions_dir() -> Result<PathBuf> {
1402        crate::config::Config::data_dir()
1403            .map(|d| d.join("sessions"))
1404            .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))
1405    }
1406
1407    /// Get the path for a session file
1408    fn session_path(id: &str) -> Result<PathBuf> {
1409        Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
1410    }
1411
1412    /// Get the event stream path from environment, if configured.
1413    /// Returns None if CODETETHER_EVENT_STREAM_PATH is not set.
1414    fn event_stream_path() -> Option<PathBuf> {
1415        std::env::var("CODETETHER_EVENT_STREAM_PATH")
1416            .ok()
1417            .map(PathBuf::from)
1418    }
1419}
1420
1421/// Result from a session prompt
1422#[derive(Debug, Clone, Serialize, Deserialize)]
1423pub struct SessionResult {
1424    pub text: String,
1425    pub session_id: String,
1426}
1427
1428/// Events emitted during session processing for real-time UI updates
1429#[derive(Debug, Clone)]
1430pub enum SessionEvent {
1431    /// Agent is thinking/processing
1432    Thinking,
1433    /// Tool call started
1434    ToolCallStart { name: String, arguments: String },
1435    /// Tool call completed with result
1436    ToolCallComplete {
1437        name: String,
1438        output: String,
1439        success: bool,
1440    },
1441    /// Partial text output (for streaming)
1442    TextChunk(String),
1443    /// Final text output
1444    TextComplete(String),
1445    /// Model thinking/reasoning output
1446    ThinkingComplete(String),
1447    /// Token usage for one LLM round-trip
1448    UsageReport {
1449        prompt_tokens: usize,
1450        completion_tokens: usize,
1451        duration_ms: u64,
1452        model: String,
1453    },
1454    /// Updated session state for caller to sync back
1455    SessionSync(Session),
1456    /// Processing complete
1457    Done,
1458    /// Error occurred
1459    Error(String),
1460}
1461
1462/// List all sessions
1463pub async fn list_sessions() -> Result<Vec<SessionSummary>> {
1464    let sessions_dir = crate::config::Config::data_dir()
1465        .map(|d| d.join("sessions"))
1466        .ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
1467
1468    if !sessions_dir.exists() {
1469        return Ok(Vec::new());
1470    }
1471
1472    let mut summaries = Vec::new();
1473    let mut entries = fs::read_dir(&sessions_dir).await?;
1474
1475    while let Some(entry) = entries.next_entry().await? {
1476        let path = entry.path();
1477        if path.extension().map(|e| e == "json").unwrap_or(false) {
1478            if let Ok(content) = fs::read_to_string(&path).await {
1479                if let Ok(session) = serde_json::from_str::<Session>(&content) {
1480                    summaries.push(SessionSummary {
1481                        id: session.id,
1482                        title: session.title,
1483                        created_at: session.created_at,
1484                        updated_at: session.updated_at,
1485                        message_count: session.messages.len(),
1486                        agent: session.agent,
1487                        directory: session.metadata.directory,
1488                    });
1489                }
1490            }
1491        }
1492    }
1493
1494    summaries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1495    Ok(summaries)
1496}
1497
1498/// List sessions scoped to a specific directory (workspace)
1499///
1500/// Only returns sessions whose `metadata.directory` matches the given path.
1501/// This prevents sessions from other workspaces "leaking" into the TUI.
1502pub async fn list_sessions_for_directory(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1503    let all = list_sessions().await?;
1504    let canonical = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
1505    Ok(all
1506        .into_iter()
1507        .filter(|s| {
1508            s.directory
1509                .as_ref()
1510                .map(|d| d.canonicalize().unwrap_or_else(|_| d.clone()) == canonical)
1511                .unwrap_or(false)
1512        })
1513        .collect())
1514}
1515
1516/// List sessions including OpenCode sessions for a directory.
1517///
1518/// Merges CodeTether sessions with any discovered OpenCode sessions,
1519/// sorted by most recently updated first. OpenCode sessions are
1520/// prefixed with `opencode_` in their ID.
1521pub async fn list_sessions_with_opencode(dir: &std::path::Path) -> Result<Vec<SessionSummary>> {
1522    let mut sessions = list_sessions_for_directory(dir).await?;
1523
1524    // Also include OpenCode sessions if available
1525    if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1526        if storage.exists() {
1527            if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1528                for oc in oc_sessions {
1529                    // Skip if we already have a CodeTether import of this session
1530                    let import_id = format!("opencode_{}", oc.id);
1531                    if sessions.iter().any(|s| s.id == import_id) {
1532                        continue;
1533                    }
1534
1535                    sessions.push(SessionSummary {
1536                        id: import_id,
1537                        title: Some(format!("[opencode] {}", oc.title)),
1538                        created_at: oc.created_at,
1539                        updated_at: oc.updated_at,
1540                        message_count: oc.message_count,
1541                        agent: "build".to_string(),
1542                        directory: Some(PathBuf::from(&oc.directory)),
1543                    });
1544                }
1545            }
1546        }
1547    }
1548
1549    // Re-sort merged list by updated_at descending
1550    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1551    Ok(sessions)
1552}
1553
1554/// List sessions including OpenCode sessions for a directory with pagination.
1555///
1556/// This is more efficient than `list_sessions_with_opencode` when you only need
1557/// a subset of sessions (e.g., for the TUI session picker).
1558///
1559/// - `limit`: Maximum number of sessions to return (default: 100)
1560/// - `offset`: Number of sessions to skip (default: 0)
1561pub async fn list_sessions_with_opencode_paged(
1562    dir: &std::path::Path,
1563    limit: usize,
1564    offset: usize,
1565) -> Result<Vec<SessionSummary>> {
1566    let mut sessions = list_sessions_for_directory(dir).await?;
1567
1568    // Also include OpenCode sessions if available
1569    if let Some(storage) = crate::opencode::OpenCodeStorage::new() {
1570        if storage.exists() {
1571            if let Ok(oc_sessions) = storage.list_sessions_for_directory(dir).await {
1572                for oc in oc_sessions {
1573                    // Skip if we already have a CodeTether import of this session
1574                    let import_id = format!("opencode_{}", oc.id);
1575                    if sessions.iter().any(|s| s.id == import_id) {
1576                        continue;
1577                    }
1578
1579                    sessions.push(SessionSummary {
1580                        id: import_id,
1581                        title: Some(format!("[opencode] {}", oc.title)),
1582                        created_at: oc.created_at,
1583                        updated_at: oc.updated_at,
1584                        message_count: oc.message_count,
1585                        agent: "build".to_string(),
1586                        directory: Some(PathBuf::from(&oc.directory)),
1587                    });
1588                }
1589            }
1590        }
1591    }
1592
1593    // Re-sort merged list by updated_at descending
1594    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1595
1596    // Apply pagination
1597    Ok(sessions.into_iter().skip(offset).take(limit).collect())
1598}
1599
1600/// Summary of a session for listing
1601#[derive(Debug, Clone, Serialize, Deserialize)]
1602pub struct SessionSummary {
1603    pub id: String,
1604    pub title: Option<String>,
1605    pub created_at: DateTime<Utc>,
1606    pub updated_at: DateTime<Utc>,
1607    pub message_count: usize,
1608    pub agent: String,
1609    /// The working directory this session was created in
1610    #[serde(default)]
1611    pub directory: Option<PathBuf>,
1612}
1613
1614fn truncate_with_ellipsis(value: &str, max_chars: usize) -> String {
1615    if max_chars == 0 {
1616        return String::new();
1617    }
1618
1619    let mut chars = value.chars();
1620    let mut output = String::new();
1621    for _ in 0..max_chars {
1622        if let Some(ch) = chars.next() {
1623            output.push(ch);
1624        } else {
1625            return value.to_string();
1626        }
1627    }
1628
1629    if chars.next().is_some() {
1630        format!("{output}...")
1631    } else {
1632        output
1633    }
1634}
1635
1636// Async helper for Vec - kept for potential future use
1637#[allow(dead_code)]
1638use futures::StreamExt;
1639
1640#[allow(dead_code)]
1641trait AsyncCollect<T> {
1642    async fn collect(self) -> Vec<T>;
1643}
1644
1645#[allow(dead_code)]
1646impl<S, T> AsyncCollect<T> for S
1647where
1648    S: futures::Stream<Item = T> + Unpin,
1649{
1650    async fn collect(mut self) -> Vec<T> {
1651        let mut items = Vec::new();
1652        while let Some(item) = self.next().await {
1653            items.push(item);
1654        }
1655        items
1656    }
1657}