use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use imp_llm::auth::{ApiKey, AuthStore};
use imp_llm::model::{ModelMeta, ModelRegistry};
use imp_llm::providers::create_provider;
use imp_llm::{Model, ThinkingLevel};
use crate::agent::{Agent, AgentCommand, AgentEvent, AgentHandle};
use crate::builder::AgentBuilder;
use crate::config::{AgentMode, Config};
use crate::error::{Error, Result};
use crate::policy::RunPolicy;
use crate::session::{SessionCheckpointRecord, SessionEntry, SessionManager};
use crate::storage;
use crate::system_prompt::{Fact, TaskContext};
use crate::ui::UserInterface;
#[derive(Debug, Clone, Default)]
pub enum SessionChoice {
#[default]
New,
InMemory,
Continue,
Open(PathBuf),
}
use crate::tools::LuaToolLoader;
use crate::workflow::{AutonomyMode, VerificationGate};
pub struct SessionOptions {
pub cwd: PathBuf,
pub model_override: Option<Model>,
pub model: Option<String>,
pub provider: Option<String>,
pub api_key: Option<String>,
pub thinking: Option<ThinkingLevel>,
pub mode: Option<AgentMode>,
pub autonomy_mode: Option<AutonomyMode>,
pub verification_gates: Vec<VerificationGate>,
pub max_turns: Option<u32>,
pub max_tokens: Option<u32>,
pub system_prompt: Option<String>,
pub no_tools: bool,
pub session: SessionChoice,
pub task: Option<TaskContext>,
pub facts: Vec<Fact>,
pub lua_loader: Option<LuaToolLoader>,
pub run_policy: RunPolicy,
pub ui: Option<Arc<dyn UserInterface>>,
pub auth_path: Option<PathBuf>,
pub context_prefill: Vec<imp_llm::Message>,
}
impl Default for SessionOptions {
fn default() -> Self {
Self {
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
model_override: None,
model: None,
provider: None,
api_key: None,
thinking: None,
mode: None,
autonomy_mode: None,
verification_gates: Vec::new(),
max_turns: None,
max_tokens: None,
system_prompt: None,
no_tools: false,
session: SessionChoice::default(),
task: None,
facts: Vec::new(),
lua_loader: None,
run_policy: RunPolicy::default(),
ui: None,
auth_path: None,
context_prefill: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeConnectionIntent<'a> {
pub model_hint: Option<&'a str>,
pub config_model: Option<&'a str>,
pub provider_override: Option<&'a str>,
pub api_key_override_present: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedRuntimeConnection {
pub model_id: String,
pub provider_name: String,
}
pub fn resolve_runtime_connection(
intent: RuntimeConnectionIntent<'_>,
auth_store: &AuthStore,
registry: &ModelRegistry,
) -> std::result::Result<ResolvedRuntimeConnection, String> {
let model_hint = intent
.model_hint
.or(intent.config_model)
.unwrap_or("sonnet");
let meta = registry
.resolve_meta(model_hint, intent.provider_override)
.ok_or_else(|| format!("Unknown model: {model_hint}"))?;
let provider_name = intent
.provider_override
.unwrap_or(&meta.provider)
.to_string();
if let Some(oauth_route) = auth_preferred_oauth_route(
intent.provider_override,
intent.api_key_override_present,
auth_store,
registry,
&meta,
&provider_name,
) {
return Ok(oauth_route);
}
Ok(ResolvedRuntimeConnection {
model_id: meta.id.clone(),
provider_name,
})
}
pub struct ImpSession {
agent: Option<Agent>,
handle: AgentHandle,
session_mgr: SessionManager,
config: Config,
model: Model,
auth_store: AuthStore,
model_registry: ModelRegistry,
cwd: PathBuf,
agent_task: Option<JoinHandle<(Agent, Result<()>)>>,
completed_run_result: Option<Result<()>>,
pending_persistence_errors: VecDeque<String>,
context_prefill: Vec<imp_llm::Message>,
context_prefill_injected: bool,
}
impl ImpSession {
pub async fn create(options: SessionOptions) -> Result<Self> {
let cwd = options.cwd.clone();
let _ = storage::reconcile_legacy_into_global_root();
let mut config = Config::resolve(&Config::user_config_dir(), Some(&cwd))?;
if let Some(thinking) = options.thinking {
config.thinking = Some(thinking);
}
if let Some(mode) = options.mode {
config.mode = mode;
}
let auth_path = options
.auth_path
.clone()
.or_else(storage::existing_global_auth_path)
.unwrap_or_else(storage::global_auth_path);
let mut auth_store =
AuthStore::load(&auth_path).unwrap_or_else(|_| AuthStore::new(auth_path));
if let Some(ref key) = options.api_key {
let _ = key; }
let model_registry = ModelRegistry::with_builtins();
let (model, _provider_name, api_key) = if let Some(model) = options.model_override.as_ref()
{
(
clone_model(model),
model.meta.provider.clone(),
String::new(),
)
} else {
let runtime_connection = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: options.model.as_deref(),
config_model: config.model.as_deref(),
provider_override: options.provider.as_deref(),
api_key_override_present: options.api_key.is_some(),
},
&auth_store,
&model_registry,
)
.map_err(Error::Config)?;
let meta = model_registry
.resolve_meta(
&runtime_connection.model_id,
Some(&runtime_connection.provider_name),
)
.ok_or_else(|| {
Error::Config(format!(
"Unknown model/provider route: {} via {}",
runtime_connection.model_id, runtime_connection.provider_name
))
})?;
let provider_name = runtime_connection.provider_name.clone();
if let Some(ref key) = options.api_key {
auth_store.set_runtime_key(&provider_name, key.clone());
}
let provider = create_provider(&provider_name)
.ok_or_else(|| Error::Config(format!("Unknown provider: {provider_name}")))?;
let api_key = resolve_api_key(&mut auth_store, &provider_name).await?;
(
Model {
meta,
provider: Arc::from(provider),
},
provider_name,
api_key,
)
};
let mut builder =
AgentBuilder::new(config.clone(), cwd.clone(), clone_model(&model), api_key);
if let Some(task) = &options.task {
builder = builder.task(task.clone());
}
if !options.facts.is_empty() {
builder = builder.facts(options.facts.clone());
}
if let Some(prompt) = &options.system_prompt {
builder = builder.system_prompt(prompt.clone());
}
if let Some(lua_loader) = options.lua_loader {
builder = builder.lua_tool_loader(move |policy, tools| lua_loader(policy, tools));
}
if let Some(autonomy_mode) = options.autonomy_mode {
builder = builder.autonomy_mode(autonomy_mode);
}
builder = builder.verification_gates(options.verification_gates.clone());
builder = builder.run_policy(options.run_policy.clone());
let (mut agent, handle) = builder.build()?;
if options.no_tools {
agent.tools.retain(|_| false);
}
if options.no_tools {
agent.thinking_level = config.thinking.unwrap_or(ThinkingLevel::Off);
if let Some(max_tokens) = options.max_tokens.or(config.max_tokens) {
agent.max_tokens = Some(max_tokens);
}
} else if let Some(max_tokens) = options.max_tokens {
agent.max_tokens = Some(max_tokens);
}
if let Some(ui) = &options.ui {
agent.ui = Arc::clone(ui);
}
let session_dir = storage::global_sessions_dir();
let session_mgr = match options.session {
SessionChoice::New => SessionManager::new(&cwd, &session_dir)?,
SessionChoice::InMemory => SessionManager::in_memory(),
SessionChoice::Continue => SessionManager::continue_recent(&cwd, &session_dir)?
.unwrap_or_else(|| SessionManager::new(&cwd, &session_dir).unwrap()),
SessionChoice::Open(ref path) => SessionManager::open(path)?,
};
Ok(Self {
agent: Some(agent),
handle,
session_mgr,
config,
model,
auth_store,
model_registry,
cwd,
context_prefill: options.context_prefill,
context_prefill_injected: false,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
})
}
pub async fn prompt(&mut self, text: &str) -> Result<()> {
if self.agent_task.is_some() {
return Err(Error::Config(
"Agent is already running. Cancel or wait for it to finish.".into(),
));
}
self.completed_run_result = None;
self.pending_persistence_errors.clear();
let msg_id = uuid::Uuid::new_v4().to_string();
let _ = self.session_mgr.append(SessionEntry::Message {
id: msg_id,
parent_id: None,
message: imp_llm::Message::user(text),
});
let mut agent = self
.agent
.take()
.ok_or_else(|| Error::Config("Agent already consumed".into()))?;
let mut history: Vec<imp_llm::Message> = self.session_mgr.get_active_messages();
if matches!(
history.last(),
Some(imp_llm::Message::User(user))
if matches!(
user.content.as_slice(),
[imp_llm::ContentBlock::Text { text: last_text }] if last_text == text
)
) {
history.pop();
}
if !self.context_prefill_injected && !self.context_prefill.is_empty() {
for msg in &self.context_prefill {
history.push(msg.clone());
}
history.push(imp_llm::Message::Assistant(imp_llm::AssistantMessage {
content: vec![imp_llm::ContentBlock::Text {
text: "Context loaded. Ready to work.".into(),
}],
usage: None,
stop_reason: imp_llm::StopReason::EndTurn,
timestamp: imp_llm::now(),
}));
self.context_prefill_injected = true;
}
agent.messages = history;
let prompt = text.to_string();
let task = tokio::spawn(async move {
let result = agent.run(prompt).await;
(agent, result)
});
self.agent_task = Some(task);
Ok(())
}
pub async fn prompt_and_wait(&mut self, text: &str) -> Result<()> {
self.prompt(text).await?;
self.wait().await
}
pub async fn wait(&mut self) -> Result<()> {
if let Some(task) = self.agent_task.take() {
let (agent, result) = task
.await
.map_err(|e| Error::Config(format!("Agent task panicked: {e}")))?;
self.agent = Some(agent);
self.completed_run_result = Some(result);
self.drain_pending_events_for_persistence();
}
if let Some(result) = self.completed_run_result.take() {
return result;
}
Ok(())
}
pub async fn steer(&self, text: &str) -> Result<()> {
self.handle
.command_tx
.send(AgentCommand::Steer(text.into()))
.await
.map_err(|_| Error::Config("Agent not running".into()))
}
pub async fn follow_up(&self, text: &str) -> Result<()> {
self.handle
.command_tx
.send(AgentCommand::FollowUp(text.into()))
.await
.map_err(|_| Error::Config("Agent not running".into()))
}
pub async fn cancel(&self) -> Result<()> {
self.handle
.command_tx
.send(AgentCommand::Cancel)
.await
.map_err(|_| Error::Config("Agent not running".into()))
}
pub fn abort(&mut self) {
if let Some(task) = self.agent_task.take() {
task.abort();
self.completed_run_result = Some(Err(Error::Cancelled));
}
}
pub async fn recv_event(&mut self) -> Option<AgentEvent> {
if let Some(error) = self.take_persistence_error() {
return Some(AgentEvent::Error { error });
}
if self.agent_task.is_none() && self.completed_run_result.is_some() {
return None;
}
let event = self.handle.event_rx.recv().await?;
let events = self.persist_event_entries(&event);
if matches!(event, AgentEvent::AgentEnd { .. }) {
if let Some(task) = self.agent_task.take() {
match task.await {
Ok((agent, result)) => {
self.agent = Some(agent);
self.completed_run_result = Some(result);
}
Err(join_error) => {
self.push_persistence_error(
events,
format!("agent task panicked: {join_error}"),
);
}
}
}
}
Some(event)
}
pub fn event_rx(&mut self) -> &mut mpsc::Receiver<AgentEvent> {
&mut self.handle.event_rx
}
pub async fn set_model(&mut self, hint: &str) -> Result<()> {
let meta = self
.model_registry
.resolve_meta(hint, None)
.ok_or_else(|| Error::Config(format!("Unknown model: {hint}")))?;
let provider_name = meta.provider.clone();
let provider = create_provider(&provider_name)
.ok_or_else(|| Error::Config(format!("Unknown provider: {provider_name}")))?;
let api_key = resolve_api_key(&mut self.auth_store, &provider_name).await?;
self.model = Model {
meta,
provider: Arc::from(provider),
};
if let Some(ref mut agent) = self.agent {
agent.model = clone_model(&self.model);
agent.api_key = api_key;
}
Ok(())
}
pub fn set_thinking(&mut self, level: ThinkingLevel) {
self.config.thinking = Some(level);
if let Some(ref mut agent) = self.agent {
agent.thinking_level = level;
}
}
pub fn model(&self) -> &Model {
&self.model
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn session_manager(&self) -> &SessionManager {
&self.session_mgr
}
pub fn session_manager_mut(&mut self) -> &mut SessionManager {
&mut self.session_mgr
}
pub fn cwd(&self) -> &PathBuf {
&self.cwd
}
pub fn auth_store(&self) -> &AuthStore {
&self.auth_store
}
pub fn auth_store_mut(&mut self) -> &mut AuthStore {
&mut self.auth_store
}
pub fn model_registry(&self) -> &ModelRegistry {
&self.model_registry
}
pub fn is_running(&self) -> bool {
self.agent_task.is_some()
}
pub fn command_tx(&self) -> &mpsc::Sender<AgentCommand> {
&self.handle.command_tx
}
fn persist_event_entries(&mut self, event: &AgentEvent) -> Vec<&'static str> {
let persisted = match self
.session_mgr
.persist_agent_event_entries(&self.model, event)
{
Ok(persisted) => persisted,
Err(error) => {
self.push_persistence_error(
Vec::new(),
format!("failed to persist agent event entries: {error}"),
);
Vec::new()
}
};
if let Some(agent) = self.agent.as_ref() {
if let Err(error) =
persist_checkpoint_records(&mut self.session_mgr, &agent.checkpoint_state)
{
self.push_persistence_error(
persisted.clone(),
format!("failed to persist checkpoint records: {error}"),
);
}
}
persisted
}
fn drain_pending_events_for_persistence(&mut self) {
while let Ok(event) = self.handle.event_rx.try_recv() {
self.persist_event_entries(&event);
}
}
fn push_persistence_error(&mut self, persisted: Vec<&'static str>, error: String) {
let prefix = if persisted.is_empty() {
"session persistence warning".to_string()
} else {
format!("session persistence warning after {}", persisted.join(", "))
};
self.pending_persistence_errors
.push_back(format!("{prefix}: {error}"));
}
fn take_persistence_error(&mut self) -> Option<String> {
self.pending_persistence_errors.pop_front()
}
}
async fn resolve_api_key(auth_store: &mut AuthStore, provider: &str) -> Result<ApiKey> {
let result = match provider {
"openai-codex" => auth_store.resolve_chatgpt_oauth().await,
"anthropic" | "kimi-code" => auth_store.resolve_with_refresh(provider).await,
_ => auth_store.resolve(provider),
};
result.map_err(|e| Error::Config(format!("Auth failed for {provider}: {e}")))
}
fn auth_preferred_oauth_route(
provider_override: Option<&str>,
api_key_override_present: bool,
auth_store: &AuthStore,
registry: &ModelRegistry,
meta: &ModelMeta,
provider_name: &str,
) -> Option<ResolvedRuntimeConnection> {
if should_use_openai_chatgpt_route(
provider_override,
api_key_override_present,
auth_store,
registry,
&meta.id,
provider_name,
) {
return Some(ResolvedRuntimeConnection {
model_id: meta.id.clone(),
provider_name: "openai-codex".to_string(),
});
}
if should_use_kimi_code_route(
provider_override,
api_key_override_present,
auth_store,
registry,
meta,
provider_name,
) {
return Some(ResolvedRuntimeConnection {
model_id: "kimi2.6".to_string(),
provider_name: "kimi-code".to_string(),
});
}
None
}
fn should_use_openai_chatgpt_route(
provider_override: Option<&str>,
api_key_override_present: bool,
auth_store: &AuthStore,
registry: &ModelRegistry,
model_id: &str,
provider_name: &str,
) -> bool {
let provider_allows_fallback = match provider_override {
None => true,
Some("openai") => true,
Some(_) => false,
};
provider_allows_fallback
&& !api_key_override_present
&& provider_name == "openai"
&& auth_store.resolve_api_key_only("openai").is_err()
&& (auth_store.get_oauth("openai").is_some()
|| auth_store.get_oauth("openai-codex").is_some())
&& codex_supports_model(registry, model_id)
}
fn should_use_kimi_code_route(
provider_override: Option<&str>,
api_key_override_present: bool,
auth_store: &AuthStore,
registry: &ModelRegistry,
meta: &ModelMeta,
provider_name: &str,
) -> bool {
let provider_allows_fallback = match provider_override {
None => true,
Some("moonshot") => true,
Some("kimi-code") => true,
Some(_) => false,
};
provider_allows_fallback
&& !api_key_override_present
&& provider_name == "moonshot"
&& auth_store.resolve_api_key_only("moonshot").is_err()
&& auth_store.get_oauth("kimi-code").is_some()
&& registry.find("kimi2.6").is_some()
&& is_kimi_moonshot_model(&meta.id)
}
fn is_kimi_moonshot_model(model_id: &str) -> bool {
matches!(
model_id,
"kimi-k2.6"
| "kimi-k2.5"
| "kimi-k2-0905-preview"
| "kimi-k2-turbo-preview"
| "kimi-k2-thinking"
| "kimi-k2-thinking-turbo"
)
}
fn clone_model(model: &Model) -> Model {
Model {
meta: model.meta.clone(),
provider: Arc::clone(&model.provider),
}
}
fn persist_checkpoint_records(
session_mgr: &mut SessionManager,
checkpoint_state: &crate::tools::CheckpointState,
) -> Result<Vec<String>> {
let existing: std::collections::HashSet<String> = session_mgr
.checkpoint_records()
.into_iter()
.map(|record| record.checkpoint_id)
.collect();
let mut persisted = Vec::new();
for record in checkpoint_state.checkpoints() {
if existing.contains(&record.id) {
continue;
}
session_mgr.append_checkpoint_record(SessionCheckpointRecord {
version: crate::session::CHECKPOINT_RECORD_VERSION,
checkpoint_id: record.id.clone(),
created_at: record.created_at,
label: record.label.clone(),
files: record
.files
.iter()
.map(|path| path.to_string_lossy().to_string())
.collect(),
})?;
persisted.push(record.id);
}
Ok(persisted)
}
fn codex_supports_model(_registry: &ModelRegistry, model_id: &str) -> bool {
imp_llm::model::builtin_openai_codex_models()
.iter()
.any(|m| m.id == model_id)
}
#[cfg(test)]
mod tests {
use super::*;
use imp_llm::{
auth::{ApiKey, AuthStore},
model::{Capabilities, ModelPricing},
provider::{Context, Provider, RequestOptions},
AssistantMessage, ContentBlock, ModelMeta, StopReason, StreamEvent, Usage,
};
use serde_json::json;
use tempfile::TempDir;
struct NoopProvider {
models: Vec<ModelMeta>,
}
struct SingleResponseProvider {
models: Vec<ModelMeta>,
events: std::sync::Mutex<Option<Vec<imp_llm::Result<StreamEvent>>>>,
}
#[async_trait::async_trait]
impl Provider for NoopProvider {
fn stream(
&self,
_model: &Model,
_context: Context,
_options: RequestOptions,
_api_key: &str,
) -> std::pin::Pin<Box<dyn futures_core::Stream<Item = imp_llm::Result<StreamEvent>> + Send>>
{
Box::pin(futures::stream::empty())
}
async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
Ok(String::new())
}
fn id(&self) -> &str {
"noop"
}
fn models(&self) -> &[ModelMeta] {
&self.models
}
}
#[async_trait::async_trait]
impl Provider for SingleResponseProvider {
fn stream(
&self,
_model: &Model,
_context: Context,
_options: RequestOptions,
_api_key: &str,
) -> std::pin::Pin<Box<dyn futures_core::Stream<Item = imp_llm::Result<StreamEvent>> + Send>>
{
let events = self
.events
.lock()
.expect("single response provider lock")
.take()
.unwrap_or_default();
Box::pin(futures::stream::iter(events))
}
async fn resolve_auth(&self, _auth: &AuthStore) -> imp_llm::Result<ApiKey> {
Ok(String::new())
}
fn id(&self) -> &str {
"single-response"
}
fn models(&self) -> &[ModelMeta] {
&self.models
}
}
fn test_model() -> Model {
let meta = ModelMeta {
id: "test-model".into(),
provider: "test-provider".into(),
name: "Test Model".into(),
context_window: 8192,
max_output_tokens: 2048,
pricing: ModelPricing {
input_per_mtok: 2.0,
output_per_mtok: 4.0,
cache_read_per_mtok: 0.5,
cache_write_per_mtok: 1.0,
},
capabilities: Capabilities {
reasoning: false,
images: false,
tool_use: true,
},
};
Model {
meta: meta.clone(),
provider: Arc::new(NoopProvider { models: vec![meta] }),
}
}
fn test_model_with_events(events: Vec<imp_llm::Result<StreamEvent>>) -> Model {
let meta = ModelMeta {
id: "test-model".into(),
provider: "test-provider".into(),
name: "Test Model".into(),
context_window: 8192,
max_output_tokens: 2048,
pricing: ModelPricing {
input_per_mtok: 2.0,
output_per_mtok: 4.0,
cache_read_per_mtok: 0.5,
cache_write_per_mtok: 1.0,
},
capabilities: Capabilities {
reasoning: false,
images: false,
tool_use: true,
},
};
Model {
meta: meta.clone(),
provider: Arc::new(SingleResponseProvider {
models: vec![meta],
events: std::sync::Mutex::new(Some(events)),
}),
}
}
fn test_assistant_message(timestamp: u64, usage: Option<Usage>) -> AssistantMessage {
AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage,
stop_reason: StopReason::EndTurn,
timestamp,
}
}
#[test]
fn session_options_default_is_sensible() {
let opts = SessionOptions::default();
assert!(opts.model.is_none());
assert!(opts.max_tokens.is_none());
assert!(!opts.no_tools);
assert!(matches!(opts.session, SessionChoice::New));
}
#[test]
fn resolve_runtime_connection_prefers_openai_chatgpt_route_when_oauth_exists() {
let dir = tempfile::tempdir().unwrap();
let auth_path = dir.path().join("auth.json");
let mut auth_store = AuthStore::new(auth_path);
auth_store
.store(
"openai",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("gpt-5.4"),
config_model: None,
provider_override: Some("openai"),
api_key_override_present: false,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.model_id, "gpt-5.4");
assert_eq!(resolved.provider_name, "openai-codex");
}
#[test]
fn resolve_runtime_connection_respects_forced_non_openai_provider() {
let auth_path = PathBuf::from("/tmp/nonexistent-auth.json");
let auth_store = AuthStore::new(auth_path);
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("gpt-5.4"),
config_model: None,
provider_override: Some("anthropic"),
api_key_override_present: false,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.provider_name, "anthropic");
}
#[test]
fn resolve_runtime_connection_does_not_switch_when_model_is_not_codex_supported() {
let dir = tempfile::tempdir().unwrap();
let auth_path = dir.path().join("auth.json");
let mut auth_store = AuthStore::new(auth_path);
auth_store
.store(
"openai",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("gpt-4o"),
config_model: None,
provider_override: Some("openai"),
api_key_override_present: false,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.model_id, "gpt-4o");
assert_eq!(resolved.provider_name, "openai");
}
#[test]
fn resolve_runtime_connection_does_not_switch_when_api_key_override_is_present() {
let dir = tempfile::tempdir().unwrap();
let auth_path = dir.path().join("auth.json");
let mut auth_store = AuthStore::new(auth_path);
auth_store
.store(
"openai",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("gpt-5.4"),
config_model: None,
provider_override: None,
api_key_override_present: true,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.model_id, "gpt-5.4");
assert_eq!(resolved.provider_name, "openai");
}
#[test]
fn resolve_runtime_connection_prefers_kimi_code_route_when_oauth_exists_without_api_key() {
let dir = tempfile::tempdir().unwrap();
let auth_path = dir.path().join("auth.json");
let mut auth_store = AuthStore::new(auth_path);
auth_store
.store(
"kimi-code",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("kimi"),
config_model: None,
provider_override: None,
api_key_override_present: false,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.model_id, "kimi2.6");
assert_eq!(resolved.provider_name, "kimi-code");
}
#[test]
fn resolve_runtime_connection_keeps_moonshot_kimi_when_api_key_exists() {
let dir = tempfile::tempdir().unwrap();
let auth_path = dir.path().join("auth.json");
let mut auth_store = AuthStore::new(auth_path);
auth_store
.store(
"moonshot",
imp_llm::auth::StoredCredential::ApiKey {
key: "sk-moonshot".into(),
},
)
.unwrap();
auth_store
.store(
"kimi-code",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let registry = ModelRegistry::with_builtins();
let resolved = resolve_runtime_connection(
RuntimeConnectionIntent {
model_hint: Some("kimi"),
config_model: None,
provider_override: None,
api_key_override_present: false,
},
&auth_store,
®istry,
)
.unwrap();
assert_eq!(resolved.model_id, "kimi-k2.6");
assert_eq!(resolved.provider_name, "moonshot");
}
#[tokio::test]
async fn no_tools_session_surfaces_auth_failure_instead_of_empty_api_key() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let auth_path = tmp.path().join("auth.json");
std::fs::create_dir_all(&cwd).unwrap();
let result = ImpSession::create(SessionOptions {
cwd: cwd.clone(),
auth_path: Some(auth_path),
provider: Some("openai-codex".into()),
model: Some("gpt-5.4".into()),
no_tools: true,
session: SessionChoice::InMemory,
..Default::default()
})
.await;
match result {
Ok(_) => panic!("missing auth should fail clearly"),
Err(Error::Config(message)) => {
assert!(message.contains("Auth failed for openai-codex"));
assert!(!message.contains("Incorrect API key provided: ''"));
}
Err(other) => panic!("expected config error, got {other:?}"),
}
}
#[tokio::test]
async fn no_tools_session_builds_assembled_system_prompt_when_task_present() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let auth_path = tmp.path().join("auth.json");
std::fs::create_dir_all(&cwd).unwrap();
let mut auth_store = AuthStore::new(auth_path.clone());
auth_store
.store(
"openai",
imp_llm::auth::StoredCredential::OAuth(imp_llm::auth::OAuthCredential {
access_token: "oauth-token".into(),
refresh_token: "refresh-token".into(),
expires_at: imp_llm::now() + 3600,
}),
)
.unwrap();
let session = ImpSession::create(SessionOptions {
cwd: cwd.clone(),
auth_path: Some(auth_path),
provider: Some("openai".into()),
model: Some("gpt-5.4".into()),
no_tools: true,
session: SessionChoice::InMemory,
task: Some(TaskContext {
title: "Test task".into(),
description: "Verify headless prompt assembly".into(),
design: None,
acceptance: Some("Prompt includes task guidance".into()),
verify: None,
verify_timeout_secs: None,
fail_first: false,
notes: None,
attempts: vec![],
dependencies: vec![],
decisions: vec![],
context_paths: vec![],
constraints: vec![],
}),
..Default::default()
})
.await
.expect("no-tools session should build with saved auth");
let prompt = session
.agent
.as_ref()
.expect("agent present")
.system_prompt
.clone();
assert!(!prompt.trim().is_empty());
assert!(prompt.contains("Test task"));
assert!(prompt.contains("Verify headless prompt assembly"));
}
#[tokio::test]
async fn recv_event_returns_none_after_agent_end_even_if_sender_is_still_owned() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let (agent, handle) = Agent::new(
clone_model(&test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
},
})])),
cwd.clone(),
);
let mut session = ImpSession {
agent: Some(agent),
handle,
session_mgr: SessionManager::in_memory(),
config: Config::default(),
model: test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
},
})]),
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
session.prompt("latest").await.unwrap();
while let Some(event) = session.recv_event().await {
if matches!(event, AgentEvent::AgentEnd { .. }) {
break;
}
}
let next = tokio::time::timeout(std::time::Duration::from_secs(1), session.recv_event())
.await
.expect("recv_event should not hang after agent end");
assert!(next.is_none());
session.wait().await.unwrap();
}
#[tokio::test]
async fn abort_marks_wait_as_cancelled() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let (agent, handle) = Agent::new(
test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
},
})]),
cwd.clone(),
);
let mut session = ImpSession {
agent: Some(agent),
handle,
session_mgr: SessionManager::in_memory(),
config: Config::default(),
model: test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
},
})]),
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: Some(tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
(
Agent::new(
test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
},
})]),
PathBuf::from("/tmp"),
)
.0,
Ok(()),
)
})),
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
session.abort();
let result = session.wait().await;
assert!(matches!(result, Err(Error::Cancelled)));
}
#[tokio::test]
async fn prompt_uses_session_history_without_duplicate_active_prompt() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let session_dir = tmp.path().join("sessions");
let model = test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 42,
},
})]);
let mut session_mgr = SessionManager::new(&cwd, &session_dir).unwrap();
session_mgr
.append(SessionEntry::Message {
id: "existing-user".into(),
parent_id: None,
message: imp_llm::Message::user("earlier"),
})
.unwrap();
let (agent, handle) = Agent::new(clone_model(&model), cwd.clone());
let mut session = ImpSession {
agent: Some(agent),
handle,
session_mgr,
config: Config::default(),
model,
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
session.prompt("latest").await.unwrap();
while let Some(event) = session.recv_event().await {
if matches!(event, AgentEvent::AgentEnd { .. }) {
break;
}
}
session.wait().await.unwrap();
let messages: Vec<_> = session.session_mgr.get_active_messages();
assert_eq!(messages.len(), 3);
match &messages[0] {
imp_llm::Message::User(user) => match user.content.as_slice() {
[ContentBlock::Text { text }] => assert_eq!(text, "earlier"),
other => panic!("unexpected user content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
match &messages[1] {
imp_llm::Message::User(user) => match user.content.as_slice() {
[ContentBlock::Text { text }] => assert_eq!(text, "latest"),
other => panic!("unexpected user content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
match &messages[2] {
imp_llm::Message::Assistant(assistant) => match assistant.content.as_slice() {
[ContentBlock::Text { text }] => assert_eq!(text, "done"),
other => panic!("unexpected assistant content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
}
#[tokio::test]
async fn prompt_uses_compacted_active_history_for_follow_up_turns() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let session_dir = tmp.path().join("sessions");
let model = test_model_with_events(vec![Ok(StreamEvent::MessageEnd {
message: AssistantMessage {
content: vec![ContentBlock::Text {
text: "follow-up done".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 99,
},
})]);
let mut session_mgr = SessionManager::new(&cwd, &session_dir).unwrap();
session_mgr
.append(SessionEntry::Message {
id: "u1".into(),
parent_id: None,
message: imp_llm::Message::user("older request"),
})
.unwrap();
session_mgr
.append(SessionEntry::Message {
id: "a1".into(),
parent_id: None,
message: imp_llm::Message::Assistant(AssistantMessage {
content: vec![ContentBlock::Text {
text: "older answer".into(),
}],
usage: None,
stop_reason: StopReason::EndTurn,
timestamp: 1,
}),
})
.unwrap();
session_mgr
.append(SessionEntry::Message {
id: "u2".into(),
parent_id: None,
message: imp_llm::Message::user("recent request"),
})
.unwrap();
session_mgr
.append(SessionEntry::Compaction {
id: "c1".into(),
parent_id: None,
summary: "[CONTEXT COMPACTION] compacted summary".into(),
first_kept_id: "u2".into(),
tokens_before: 100,
tokens_after: 40,
})
.unwrap();
let (agent, handle) = Agent::new(clone_model(&model), cwd.clone());
let mut session = ImpSession {
agent: Some(agent),
handle,
session_mgr,
config: Config::default(),
model,
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
session.prompt("new follow-up").await.unwrap();
while let Some(event) = session.recv_event().await {
if matches!(event, AgentEvent::AgentEnd { .. }) {
break;
}
}
session.wait().await.unwrap();
let messages = session.session_mgr.get_active_messages();
assert_eq!(messages.len(), 4);
match &messages[0] {
imp_llm::Message::User(user) => match user.content.as_slice() {
[ContentBlock::Text { text }] => assert!(text.contains("CONTEXT COMPACTION")),
other => panic!("unexpected summary content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
match &messages[1] {
imp_llm::Message::User(user) => match user.content.as_slice() {
[ContentBlock::Text { text }] => assert_eq!(text, "recent request"),
other => panic!("unexpected recent user content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
match &messages[2] {
imp_llm::Message::User(user) => match user.content.as_slice() {
[ContentBlock::Text { text }] => assert_eq!(text, "new follow-up"),
other => panic!("unexpected follow-up content: {other:?}"),
},
other => panic!("unexpected message: {other:?}"),
}
}
#[test]
fn persist_event_entries_writes_assistant_and_canonical_usage() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let session_dir = tmp.path().join("sessions");
let model = test_model();
let session_mgr = SessionManager::new(&cwd, &session_dir).unwrap();
let (_agent, handle) = Agent::new(clone_model(&model), cwd.clone());
let mut session = ImpSession {
agent: None,
handle,
session_mgr,
config: Config::default(),
model,
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
let message = test_assistant_message(
123,
Some(Usage {
input_tokens: 1_000,
output_tokens: 250,
cache_read_tokens: 100,
cache_write_tokens: 50,
}),
);
let persisted = session.persist_event_entries(&AgentEvent::TurnEnd {
index: 2,
message: message.clone(),
mana_review: crate::mana_review::TurnManaReview::no_change(2),
});
assert_eq!(persisted, vec!["assistant message", "canonical usage"]);
let usage_records = session.session_mgr.usage_records();
assert_eq!(usage_records.len(), 1);
let record = &usage_records[0];
assert_eq!(record.turn_index, Some(2));
assert_eq!(record.provider.as_deref(), Some("test-provider"));
assert_eq!(record.model.as_deref(), Some("test-model"));
assert!(record.request_id.starts_with("assistant:"));
assert!(record.assistant_message_id.is_some());
let cost = record.cost.as_ref().unwrap();
assert!((cost.input - 0.002).abs() < 1e-12);
assert!((cost.output - 0.001).abs() < 1e-12);
assert!((cost.cache_read - 0.00005).abs() < 1e-12);
assert!((cost.cache_write - 0.00005).abs() < 1e-12);
assert!((cost.total - 0.0031).abs() < 1e-12);
}
#[test]
fn persist_event_entries_skips_usage_record_when_usage_missing() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let session_dir = tmp.path().join("sessions");
let model = test_model();
let session_mgr = SessionManager::new(&cwd, &session_dir).unwrap();
let (_agent, handle) = Agent::new(clone_model(&model), cwd.clone());
let mut session = ImpSession {
agent: None,
handle,
session_mgr,
config: Config::default(),
model,
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
let persisted = session.persist_event_entries(&AgentEvent::TurnEnd {
index: 0,
message: test_assistant_message(456, None),
mana_review: crate::mana_review::TurnManaReview::no_change(0),
});
assert_eq!(persisted, vec!["assistant message"]);
assert!(session.session_mgr.usage_records().is_empty());
}
#[test]
fn persist_event_entries_writes_tool_results() {
let tmp = TempDir::new().unwrap();
let cwd = tmp.path().join("project");
let session_dir = tmp.path().join("sessions");
let model = test_model();
let session_mgr = SessionManager::new(&cwd, &session_dir).unwrap();
let (agent, handle) = Agent::new(clone_model(&model), cwd.clone());
std::fs::create_dir_all(&cwd).unwrap();
let file = cwd.join("tracked.rs");
std::fs::write(&file, "original").unwrap();
let checkpoint = agent
.checkpoint_state
.snapshot_paths(
std::slice::from_ref(&file),
Some("before tool result".into()),
)
.unwrap()
.unwrap();
std::fs::write(&file, "modified").unwrap();
let mut session = ImpSession {
agent: Some(agent),
handle,
session_mgr,
config: Config::default(),
model,
auth_store: AuthStore::new(tmp.path().join("auth.json")),
model_registry: ModelRegistry::with_builtins(),
cwd,
agent_task: None,
completed_run_result: None,
pending_persistence_errors: VecDeque::new(),
context_prefill: Vec::new(),
context_prefill_injected: false,
};
let persisted = session.persist_event_entries(&AgentEvent::ToolExecutionEnd {
tool_call_id: "call-1".into(),
result: imp_llm::ToolResultMessage {
tool_call_id: "call-1".into(),
tool_name: "bash".into(),
content: vec![ContentBlock::Text { text: "ok".into() }],
is_error: false,
details: json!({"exit_code": 0}),
timestamp: 999,
},
provenance: None,
});
assert_eq!(persisted, vec!["tool result"]);
assert!(session.session_mgr.entries().iter().any(|entry| matches!(
entry,
SessionEntry::Message {
message: imp_llm::Message::ToolResult(_),
..
}
)));
let checkpoints = session.session_mgr.checkpoint_records();
assert_eq!(checkpoints.len(), 1);
assert_eq!(checkpoints[0].checkpoint_id, checkpoint.id);
let restored = session
.session_mgr
.restore_checkpoint(
session
.agent
.as_ref()
.expect("agent retained for persistence test")
.checkpoint_state
.as_ref(),
&checkpoints[0].checkpoint_id,
)
.unwrap();
assert_eq!(restored, vec![file.clone()]);
assert_eq!(std::fs::read_to_string(&file).unwrap(), "original");
}
}