use crate::auto_compaction::CompactionConfig;
use crate::extensions::{ExtensionContext, ExtensionContextBuilder, ExtensionRunner, InputEvent as ExtInputEvent, InputEventResult as ExtInputEventResult, SessionShutdownEvent, SessionShutdownReason};
use crate::session::{AgentMessage, SessionManager};
use crate::settings::{Settings, ThinkingLevel};
use anyhow::{Context, Result};
use oxi_agent::{Agent, AgentEvent, AgentState};
use oxi_ai::Message;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum SessionEvent {
QueueUpdate {
steering: Vec<String>,
follow_up: Vec<String>,
},
CompactionStart {
reason: CompactionReason,
},
CompactionEnd {
reason: CompactionReason,
result: Option<CompactionResult>,
aborted: bool,
will_retry: bool,
error_message: Option<String>,
},
SessionInfoChanged {
name: Option<String>,
},
ThinkingLevelChanged {
level: ThinkingLevel,
},
Agent(AgentEvent),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionReason {
Manual,
Threshold,
Overflow,
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub summary: String,
pub first_kept_entry_id: Option<Uuid>,
pub tokens_before: usize,
pub details: Option<serde_json::Value>,
}
#[derive(Debug, Clone)]
pub struct ScopedModel {
pub provider: String,
pub model_id: String,
pub thinking_level: Option<ThinkingLevel>,
}
#[derive(Debug, Clone)]
pub struct ModelCycleResult {
pub provider: String,
pub model_id: String,
pub thinking_level: ThinkingLevel,
pub is_scoped: bool,
}
#[derive(Debug, Clone)]
pub struct PromptOptions {
pub expand_templates: bool,
pub images: Vec<oxi_ai::ImageContent>,
pub streaming_behavior: Option<StreamingBehavior>,
pub source: InputSource,
}
impl Default for PromptOptions {
fn default() -> Self {
Self {
expand_templates: true,
images: Vec::new(),
streaming_behavior: None,
source: InputSource::Interactive,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingBehavior {
Steer,
FollowUp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InputSource {
Interactive,
Extension,
Rpc,
}
impl Default for InputSource {
fn default() -> Self {
Self::Interactive
}
}
#[derive(Debug, Clone)]
pub struct SessionStats {
pub session_id: String,
pub user_messages: usize,
pub assistant_messages: usize,
pub tool_calls: usize,
pub tool_results: usize,
pub total_messages: usize,
pub tokens: TokenStats,
pub cost: f64,
}
#[derive(Debug, Clone, Default)]
pub struct TokenStats {
pub input: usize,
pub output: usize,
pub total: usize,
}
pub struct AgentSession {
agent: Arc<Agent>,
settings: Arc<RwLock<Settings>>,
session_manager: Arc<RwLock<SessionManager>>,
listeners: Arc<RwLock<Vec<Box<dyn Fn(&SessionEvent) + Send + Sync>>>>,
event_tx: mpsc::UnboundedSender<SessionEvent>,
scoped_models: Arc<RwLock<Vec<ScopedModel>>>,
steering_messages: Arc<RwLock<VecDeque<String>>>,
follow_up_messages: Arc<RwLock<VecDeque<String>>>,
compaction_config: Arc<RwLock<CompactionConfig>>,
compaction_abort: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
overflow_recovery_attempted: Arc<RwLock<bool>>,
session_id: Arc<RwLock<String>>,
cwd: String,
streaming: Arc<AtomicBool>,
extension_runner: Arc<RwLock<Option<ExtensionRunner>>>,
}
impl AgentSession {
pub fn new(
agent: Arc<Agent>,
settings: Settings,
session_manager: SessionManager,
cwd: String,
) -> Self {
let session_id = session_manager.get_session_id();
let compaction_config = CompactionConfig {
enabled: settings.auto_compaction,
..CompactionConfig::default()
};
let (event_tx, _event_rx) = mpsc::unbounded_channel();
Self {
agent,
settings: Arc::new(RwLock::new(settings)),
session_manager: Arc::new(RwLock::new(session_manager)),
listeners: Arc::new(RwLock::new(Vec::new())),
event_tx,
scoped_models: Arc::new(RwLock::new(Vec::new())),
steering_messages: Arc::new(RwLock::new(VecDeque::new())),
follow_up_messages: Arc::new(RwLock::new(VecDeque::new())),
compaction_config: Arc::new(RwLock::new(compaction_config)),
compaction_abort: Arc::new(Mutex::new(None)),
overflow_recovery_attempted: Arc::new(RwLock::new(false)),
session_id: Arc::new(RwLock::new(session_id)),
cwd,
streaming: Arc::new(AtomicBool::new(false)),
extension_runner: Arc::new(RwLock::new(None)),
}
}
pub fn model_id(&self) -> String {
self.agent.model_id()
}
pub fn state(&self) -> AgentState {
self.agent.state()
}
pub fn thinking_level(&self) -> ThinkingLevel {
self.settings.read().thinking_level
}
pub fn is_streaming(&self) -> bool {
self.streaming.load(Ordering::SeqCst)
}
pub fn messages(&self) -> Vec<Message> {
self.agent.state().messages
}
pub fn session_id(&self) -> String {
self.session_manager.read().get_session_id()
}
pub fn is_compacting(&self) -> bool {
match self.compaction_abort.try_lock() {
Ok(guard) => guard.is_some(), Err(_) => true, }
}
pub fn auto_retry_enabled(&self) -> bool {
true
}
pub fn session_stats(&self) -> SessionStats {
let state = self.agent.state();
let mut user_messages = 0usize;
let mut assistant_messages = 0usize;
let mut tool_results = 0usize;
let mut tool_calls = 0usize;
let input_tokens = 0usize;
let output_tokens = 0usize;
for msg in &state.messages {
match msg {
Message::User(_) => user_messages += 1,
Message::Assistant(a) => {
assistant_messages += 1;
for block in &a.content {
if matches!(block, oxi_ai::ContentBlock::ToolCall(_)) {
tool_calls += 1;
}
}
let _ = &a; }
Message::ToolResult(_) => tool_results += 1,
}
}
SessionStats {
session_id: self.session_id(),
user_messages,
assistant_messages,
tool_calls,
tool_results,
total_messages: state.messages.len(),
tokens: TokenStats {
input: input_tokens,
output: output_tokens,
total: input_tokens + output_tokens,
},
cost: 0.0,
}
}
pub fn pending_message_count(&self) -> usize {
self.steering_messages.read().len() + self.follow_up_messages.read().len()
}
pub fn steering_messages(&self) -> Vec<String> {
self.steering_messages.read().iter().cloned().collect()
}
pub fn follow_up_messages(&self) -> Vec<String> {
self.follow_up_messages.read().iter().cloned().collect()
}
pub fn cwd(&self) -> &str {
&self.cwd
}
pub fn scoped_models(&self) -> Vec<ScopedModel> {
self.scoped_models.read().clone()
}
pub fn auto_compaction_enabled(&self) -> bool {
self.compaction_config.read().enabled
}
pub fn subscribe(&self, listener: Box<dyn Fn(&SessionEvent) + Send + Sync>) -> SessionListenerGuard {
let key = {
let mut listeners = self.listeners.write();
listeners.push(listener);
listeners.len() - 1
};
SessionListenerGuard {
listeners: Arc::clone(&self.listeners),
key,
}
}
pub fn subscribe_channel(&self) -> mpsc::UnboundedReceiver<SessionEvent> {
let (tx, rx) = mpsc::unbounded_channel();
self.subscribe(Box::new(move |event| {
let _ = tx.send(event.clone());
}));
rx
}
fn emit(&self, event: SessionEvent) {
let listeners = self.listeners.read();
for listener in listeners.iter() {
listener(&event);
}
let _ = self.event_tx.send(event);
}
fn emit_queue_update(&self) {
self.emit(SessionEvent::QueueUpdate {
steering: self.steering_messages(),
follow_up: self.follow_up_messages(),
});
}
pub async fn prompt(&self, text: String, options: PromptOptions) -> Result<()> {
if self.is_streaming() {
return match options.streaming_behavior {
Some(StreamingBehavior::Steer) => {
self.steer(text).await
}
Some(StreamingBehavior::FollowUp) => {
self.follow_up(text).await
}
None => {
anyhow::bail!(
"Agent is already processing. Specify streaming_behavior to queue the message."
);
}
};
}
let model_id = self.model_id();
if model_id.is_empty() {
anyhow::bail!("No model selected");
}
let (_response, events) = self.agent.run(text.clone()).await?;
self.process_events(events).await?;
Ok(())
}
pub fn prompt_streaming(
&self,
text: String,
) -> mpsc::UnboundedReceiver<AgentEvent> {
let (tx, rx) = mpsc::unbounded_channel();
self.streaming.store(true, Ordering::SeqCst);
let agent = Arc::clone(&self.agent);
let streaming = Arc::clone(&self.streaming);
tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let (agent_tx, mut agent_rx) = mpsc::channel::<AgentEvent>(256);
let agent_for_task = Arc::clone(&agent);
let agent_handle = tokio::task::spawn_local(async move {
agent_for_task.run_with_channel(text, agent_tx).await
});
while let Some(event) = agent_rx.recv().await {
let _ = tx.send(event);
}
match agent_handle.await {
Ok(Ok(_response)) => {
}
Ok(Err(e)) => {
let _ = tx.send(AgentEvent::Error {
message: e.to_string(),
});
}
Err(join_err) => {
let _ = tx.send(AgentEvent::Error {
message: format!("Agent task failed: {}", join_err),
});
}
}
streaming.store(false, Ordering::SeqCst);
})
.await;
});
});
rx
}
pub async fn steer(&self, text: String) -> Result<()> {
{
let mut queue = self.steering_messages.write();
queue.push_back(text.clone());
}
self.emit_queue_update();
self.agent.state().add_user_message(text);
Ok(())
}
pub async fn follow_up(&self, text: String) -> Result<()> {
{
let mut queue = self.follow_up_messages.write();
queue.push_back(text.clone());
}
self.emit_queue_update();
Ok(())
}
pub async fn abort(&self) {
tracing::debug!("AgentSession::abort() requested");
}
pub fn clear_queue(&self) -> (Vec<String>, Vec<String>) {
let steering: Vec<String> = self.steering_messages.write().drain(..).collect();
let follow_up: Vec<String> = self.follow_up_messages.write().drain(..).collect();
self.emit_queue_update();
(steering, follow_up)
}
pub fn set_model(&self, model_id: &str) -> Result<()> {
self.agent.switch_model(model_id)?;
{
let mut sm = self.session_manager.write();
let parts: Vec<&str> = model_id.split('/').collect();
if parts.len() >= 2 {
sm.append_model_change(parts[0], &parts[1..].join("/"));
}
}
{
let mut settings = self.settings.write();
let parts: Vec<&str> = model_id.split('/').collect();
if parts.len() >= 2 {
settings.default_provider = Some(parts[0].to_string());
settings.default_model = Some(parts[1..].join("/"));
} else {
settings.default_model = Some(model_id.to_string());
}
}
Ok(())
}
pub fn cycle_model(&self, direction: CycleDirection) -> Option<ModelCycleResult> {
let scoped = self.scoped_models.read().clone();
if !scoped.is_empty() {
return self.cycle_scoped_model(&scoped, direction);
}
let defaults = default_model_list();
if defaults.len() <= 1 {
return None;
}
self.cycle_default_model(&defaults, direction, false)
}
fn cycle_scoped_model(
&self,
scoped: &[ScopedModel],
direction: CycleDirection,
) -> Option<ModelCycleResult> {
if scoped.len() <= 1 {
return None;
}
let current_id = self.model_id();
let current_index = scoped
.iter()
.position(|m| format!("{}/{}", m.provider, m.model_id) == current_id)
.unwrap_or(0);
let len = scoped.len();
let next_index = match direction {
CycleDirection::Forward => (current_index + 1) % len,
CycleDirection::Backward => (current_index + len - 1) % len,
};
let next = &scoped[next_index];
let new_id = format!("{}/{}", next.provider, next.model_id);
if let Err(e) = self.set_model(&new_id) {
tracing::warn!("Failed to switch to scoped model {}: {}", new_id, e);
return None;
}
if let Some(level) = next.thinking_level {
self.set_thinking_level(level);
}
Some(ModelCycleResult {
provider: next.provider.clone(),
model_id: next.model_id.clone(),
thinking_level: self.thinking_level(),
is_scoped: true,
})
}
fn cycle_default_model(
&self,
models: &[(&str, &str)],
direction: CycleDirection,
_is_scoped: bool,
) -> Option<ModelCycleResult> {
let current_id = self.model_id();
let current_index = models
.iter()
.position(|(p, m)| format!("{}/{}", p, m) == current_id)
.unwrap_or(0);
let len = models.len();
let next_index = match direction {
CycleDirection::Forward => (current_index + 1) % len,
CycleDirection::Backward => (current_index + len - 1) % len,
};
let (provider, model) = models[next_index];
let new_id = format!("{}/{}", provider, model);
if let Err(e) = self.set_model(&new_id) {
tracing::warn!("Failed to switch to model {}: {}", new_id, e);
return None;
}
Some(ModelCycleResult {
provider: provider.to_string(),
model_id: model.to_string(),
thinking_level: self.thinking_level(),
is_scoped: false,
})
}
pub fn set_scoped_models(&self, models: Vec<ScopedModel>) {
*self.scoped_models.write() = models;
}
pub fn set_thinking_level(&self, level: ThinkingLevel) {
let old_level = self.thinking_level();
if level == old_level {
return;
}
{
let mut settings = self.settings.write();
settings.thinking_level = level;
}
{
let mut sm = self.session_manager.write();
sm.append_thinking_level_change(&format!("{:?}", level).to_lowercase());
}
self.emit(SessionEvent::ThinkingLevelChanged { level });
}
pub fn cycle_thinking_level(&self) -> Option<ThinkingLevel> {
let levels = [
ThinkingLevel::None,
ThinkingLevel::Minimal,
ThinkingLevel::Standard,
ThinkingLevel::Thorough,
];
let current = self.thinking_level();
let current_index = levels.iter().position(|l| *l == current).unwrap_or(0);
let next_index = (current_index + 1) % levels.len();
let next = levels[next_index];
self.set_thinking_level(next);
Some(next)
}
pub async fn compact(&self, custom_instructions: Option<String>) -> Result<CompactionResult> {
self.emit(SessionEvent::CompactionStart {
reason: CompactionReason::Manual,
});
let result = self.run_compaction(custom_instructions).await;
match &result {
Ok(r) => self.emit(SessionEvent::CompactionEnd {
reason: CompactionReason::Manual,
result: Some(r.clone()),
aborted: false,
will_retry: false,
error_message: None,
}),
Err(e) => self.emit(SessionEvent::CompactionEnd {
reason: CompactionReason::Manual,
result: None,
aborted: false,
will_retry: false,
error_message: Some(e.to_string()),
}),
}
result
}
async fn check_auto_compaction(&self) {
let config = self.compaction_config.read().clone();
if !config.enabled {
return;
}
let state = self.agent.state();
let messages = &state.messages;
if messages.is_empty() {
return;
}
let context_json = serde_json::to_string(messages).unwrap_or_default();
let estimated_tokens = oxi_ai::estimate_tokens(&context_json);
let context_window = 128_000;
let threshold = config.threshold as usize;
if estimated_tokens > (context_window * threshold / 100) {
tracing::info!(
"Auto-compaction triggered: {} tokens > {}% of {}",
estimated_tokens,
threshold,
context_window,
);
self.emit(SessionEvent::CompactionStart {
reason: CompactionReason::Threshold,
});
let result = self.run_compaction(None).await;
match result {
Ok(r) => self.emit(SessionEvent::CompactionEnd {
reason: CompactionReason::Threshold,
result: Some(r),
aborted: false,
will_retry: false,
error_message: None,
}),
Err(e) => {
tracing::warn!("Auto-compaction failed: {}", e);
self.emit(SessionEvent::CompactionEnd {
reason: CompactionReason::Threshold,
result: None,
aborted: false,
will_retry: false,
error_message: Some(format!("Auto-compaction failed: {}", e)),
});
}
}
}
}
async fn run_compaction(&self, _custom_instructions: Option<String>) -> Result<CompactionResult> {
let state = self.agent.state();
let messages = state.messages.clone();
if messages.len() < 3 {
anyhow::bail!("Nothing to compact (session too small)");
}
let compacted = self
.agent
.compaction_manager()
.compact_if_needed(&messages, None, state.estimate_tokens(), state.iteration)
.await
.context("Compaction failed")?;
match compacted {
Some(ctx) => {
let tokens_before = state.estimate_tokens();
let compacted_count = ctx.compacted_count;
self.agent.state().replace_messages(ctx.kept_messages.clone());
self.persist_session();
Ok(CompactionResult {
summary: ctx.summary.clone(),
first_kept_entry_id: None,
tokens_before,
details: Some(serde_json::json!({
"compacted_count": compacted_count,
"summary_length": ctx.summary.len(),
})),
})
}
None => {
anyhow::bail!("Nothing to compact");
}
}
}
pub async fn abort_compaction(&self) {
let mut guard = self.compaction_abort.lock().await;
if let Some(handle) = guard.take() {
handle.abort();
}
}
pub fn set_auto_compaction_enabled(&self, enabled: bool) {
self.compaction_config.write().enabled = enabled;
self.settings.write().auto_compaction = enabled;
}
fn persist_session(&self) {
let state = self.agent.state();
let messages = &state.messages;
let total = messages.len();
if total == 0 {
return;
}
let mut sm = self.session_manager.write();
let persisted = sm.persisted_count();
if persisted >= total {
return; }
for msg in &messages[persisted..] {
match msg {
Message::User(u) => {
let content = match &u.content {
oxi_ai::MessageContent::Text(t) => t.clone(),
oxi_ai::MessageContent::Blocks(blocks) => {
blocks
.iter()
.filter_map(|b| b.as_text())
.collect::<Vec<_>>()
.join("")
}
};
sm.append_message(AgentMessage::User {
content: crate::session::ContentValue::String(content),
});
}
Message::Assistant(a) => {
let content_blocks: Vec<crate::session::AssistantContentBlock> = a
.content
.iter()
.map(|b| match b {
oxi_ai::ContentBlock::Text(t) => {
crate::session::AssistantContentBlock::Text {
text: t.text.clone(),
}
}
oxi_ai::ContentBlock::Thinking(t) => {
crate::session::AssistantContentBlock::Thinking {
thinking: t.thinking.clone(),
}
}
oxi_ai::ContentBlock::ToolCall(tc) => {
crate::session::AssistantContentBlock::ToolCall {
id: tc.id.clone(),
name: tc.name.clone(),
arguments: tc.arguments.clone(),
}
}
oxi_ai::ContentBlock::Image(img) => {
crate::session::AssistantContentBlock::ImageResult {
data: img.data.clone(),
media_type: img.mime_type.clone(),
}
}
oxi_ai::ContentBlock::Unknown(v) => {
crate::session::AssistantContentBlock::Text {
text: v.to_string(),
}
}
})
.collect();
sm.append_message(AgentMessage::Assistant {
content: content_blocks,
provider: Some(a.provider.clone()),
model_id: Some(a.model.clone()),
usage: Some(crate::session::Usage {
input: Some(a.usage.input as i64),
output: Some(a.usage.output as i64),
cache_read: Some(a.usage.cache_read as i64),
cache_write: Some(a.usage.cache_write as i64),
total_tokens: Some(a.usage.total_tokens as i64),
}),
stop_reason: Some(format!("{:?}", a.stop_reason)),
});
}
Message::ToolResult(t) => {
let content = t
.content
.iter()
.filter_map(|b| b.as_text())
.collect::<Vec<_>>()
.join("");
sm.append_message(AgentMessage::ToolResult {
content: crate::session::ContentValue::String(content),
tool_call_id: t.tool_call_id.clone(),
});
}
}
}
sm.set_persisted_count(total);
}
async fn process_events(&self, events: Vec<AgentEvent>) -> Result<()> {
for event in &events {
self.emit(SessionEvent::Agent(event.clone()));
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.registry().emit_event(event);
match event {
AgentEvent::ToolCall { tool_call } => {
runner.emit_tool_call(&tool_call.name, &tool_call.arguments);
}
AgentEvent::ToolExecutionStart { tool_name, args, .. } => {
runner.emit_tool_call(tool_name, args);
}
AgentEvent::ToolExecutionEnd { tool_name, result, .. } => {
let tool_result = oxi_agent::AgentToolResult::success(&result.content);
runner.emit_tool_result_event(tool_name, &tool_result);
}
AgentEvent::Error { message } => {
let err = anyhow::anyhow!("{}", message);
runner.registry().emit_error(&err);
}
_ => {}
}
}
}
let has_complete = events.iter().any(|e| {
matches!(
e,
AgentEvent::AgentEnd { .. } | AgentEvent::Complete { .. }
)
});
if has_complete {
self.check_auto_compaction().await;
let follow_ups: Vec<String> = self.follow_up_messages.write().drain(..).collect();
if !follow_ups.is_empty() {
self.emit_queue_update();
for msg in follow_ups {
let _ = self.agent.run(msg).await;
}
}
}
self.persist_session();
Ok(())
}
pub fn set_session_name(&self, name: String) {
let mut sm = self.session_manager.write();
sm.append_session_info(&name);
self.emit(SessionEvent::SessionInfoChanged {
name: Some(name),
});
}
pub fn reset(&self) {
self.agent.reset();
*self.overflow_recovery_attempted.write() = false;
self.clear_queue();
}
pub fn agent_ref(&self) -> Arc<Agent> {
Arc::clone(&self.agent)
}
pub fn clone_handle(&self) -> AgentSessionHandle {
AgentSessionHandle {
inner: Arc::new(self.clone_inner()),
}
}
fn clone_inner(&self) -> Self {
Self {
agent: Arc::clone(&self.agent),
settings: Arc::clone(&self.settings),
session_manager: Arc::clone(&self.session_manager),
listeners: Arc::clone(&self.listeners),
event_tx: self.event_tx.clone(),
scoped_models: Arc::clone(&self.scoped_models),
steering_messages: Arc::clone(&self.steering_messages),
follow_up_messages: Arc::clone(&self.follow_up_messages),
compaction_config: Arc::clone(&self.compaction_config),
compaction_abort: Arc::clone(&self.compaction_abort),
overflow_recovery_attempted: Arc::clone(&self.overflow_recovery_attempted),
session_id: Arc::clone(&self.session_id),
cwd: self.cwd.clone(),
streaming: Arc::clone(&self.streaming),
extension_runner: Arc::clone(&self.extension_runner),
}
}
pub fn set_extension_runner(&self, runner: ExtensionRunner) {
{
let guard = self.extension_runner.read();
if let Some(existing) = guard.as_ref() {
let session_id = self.session_id();
let shutdown_event = SessionShutdownEvent {
reason: SessionShutdownReason::Reload,
target_session_file: None,
};
existing.emit_session_shutdown_event(&shutdown_event);
existing.registry().emit_session_end(&session_id);
existing.registry().emit_unload();
}
}
{
let mut guard = self.extension_runner.write();
*guard = Some(runner);
}
{
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
let ctx = self.build_extension_context();
runner.registry().emit_load(&ctx);
let session_id = self.session_id();
runner.registry().emit_session_start(&session_id);
}
}
tracing::debug!("ExtensionRunner installed into AgentSession");
}
pub fn extension_runner(&self) -> parking_lot::RwLockReadGuard<'_, Option<ExtensionRunner>> {
self.extension_runner.read()
}
pub fn take_extension_runner(&self) -> Option<ExtensionRunner> {
{
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
let session_id = self.session_id();
let shutdown_event = SessionShutdownEvent {
reason: SessionShutdownReason::Quit,
target_session_file: None,
};
runner.emit_session_shutdown_event(&shutdown_event);
runner.registry().emit_session_end(&session_id);
runner.registry().emit_unload();
}
}
self.extension_runner.write().take()
}
pub fn build_extension_context(&self) -> ExtensionContext {
ExtensionContextBuilder::new(PathBuf::from(&self.cwd))
.settings(Arc::clone(&self.settings))
.session_id(self.session_id())
.build()
}
pub fn forward_event_to_extensions(&self, event: &AgentEvent) {
self.emit(SessionEvent::Agent(event.clone()));
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.registry().emit_event(event);
match event {
AgentEvent::ToolCall { tool_call } => {
runner.emit_tool_call(&tool_call.name, &tool_call.arguments);
}
AgentEvent::ToolExecutionStart { tool_name, args, .. } => {
runner.emit_tool_call(tool_name, args);
}
AgentEvent::ToolExecutionEnd { tool_name, result, .. } => {
let tool_result = oxi_agent::AgentToolResult::success(&result.content);
runner.emit_tool_result_event(tool_name, &tool_result);
}
_ => {}
}
}
}
pub fn has_extension_handlers(&self, event_type: &str) -> bool {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.has_handlers(event_type)
} else {
false
}
}
pub fn extension_tools(&self) -> Vec<Arc<dyn oxi_agent::AgentTool>> {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.all_tools()
} else {
Vec::new()
}
}
pub fn extension_commands(&self) -> Vec<crate::extensions::Command> {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.all_commands()
} else {
Vec::new()
}
}
pub fn emit_before_tool_call(
&self,
tool_name: &str,
params: &serde_json::Value,
) -> crate::extensions::ToolCallEmitResult {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.emit_tool_call(tool_name, params)
} else {
crate::extensions::ToolCallEmitResult::default()
}
}
pub fn emit_after_tool_result(
&self,
tool_name: &str,
result: &oxi_agent::AgentToolResult,
) -> crate::extensions::ToolResultEmitResult {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.emit_tool_result_event(tool_name, result)
} else {
crate::extensions::ToolResultEmitResult::default()
}
}
pub fn process_input_through_extensions(
&self,
text: &str,
source: InputSource,
) -> ExtInputEventResult {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
let ext_source = match source {
InputSource::Interactive => crate::extensions::InputSource::Interactive,
InputSource::Extension => crate::extensions::InputSource::Extension,
InputSource::Rpc => crate::extensions::InputSource::Rpc,
};
let mut event = ExtInputEvent {
text: text.to_string(),
source: ext_source,
};
runner.emit_input_event(&mut event)
} else {
ExtInputEventResult::Continue
}
}
pub fn notify_extensions_message_sent(&self, msg: &str) {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.registry().emit_message_sent(msg);
}
}
pub fn notify_extensions_message_received(&self, msg: &str) {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
runner.registry().emit_message_received(msg);
}
}
pub fn notify_extensions_settings_changed(&self) {
let guard = self.extension_runner.read();
if let Some(runner) = guard.as_ref() {
let settings = self.settings.read().clone();
runner.registry().emit_settings_changed(&settings);
}
}
}
pub struct SessionListenerGuard {
listeners: Arc<RwLock<Vec<Box<dyn Fn(&SessionEvent) + Send + Sync>>>>,
key: usize,
}
impl Drop for SessionListenerGuard {
fn drop(&mut self) {
let mut listeners = self.listeners.write();
if self.key < listeners.len() {
listeners[self.key] = Box::new(|_| {});
}
}
}
#[derive(Clone)]
pub struct AgentSessionHandle {
inner: Arc<AgentSession>,
}
impl std::ops::Deref for AgentSessionHandle {
type Target = AgentSession;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CycleDirection {
Forward,
Backward,
}
impl Default for CycleDirection {
fn default() -> Self {
Self::Forward
}
}
fn default_model_list() -> Vec<(&'static str, &'static str)> {
vec![
("anthropic", "claude-sonnet-4-20250514"),
("anthropic", "claude-haiku-4-20250414"),
("openai", "gpt-4o"),
("openai", "gpt-4o-mini"),
("google", "gemini-2.0-flash"),
]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_model_list() {
let models = default_model_list();
assert!(!models.is_empty());
assert!(
models
.iter()
.any(|(p, m)| *p == "anthropic" && *m == "claude-sonnet-4-20250514")
);
}
#[test]
fn test_cycle_direction_default() {
assert_eq!(CycleDirection::default(), CycleDirection::Forward);
}
#[test]
fn test_thinking_level_ordering() {
let levels = [
ThinkingLevel::None,
ThinkingLevel::Minimal,
ThinkingLevel::Standard,
ThinkingLevel::Thorough,
];
let mut current = 0;
for _ in 0..levels.len() {
current = (current + 1) % levels.len();
}
assert_eq!(current, 0); }
#[test]
fn test_scoped_model() {
let model = ScopedModel {
provider: "anthropic".to_string(),
model_id: "claude-sonnet-4-20250514".to_string(),
thinking_level: Some(ThinkingLevel::Standard),
};
assert_eq!(model.provider, "anthropic");
assert_eq!(model.model_id, "claude-sonnet-4-20250514");
}
#[test]
fn test_compaction_reason() {
assert_eq!(CompactionReason::Manual, CompactionReason::Manual);
assert_ne!(CompactionReason::Manual, CompactionReason::Threshold);
assert_ne!(CompactionReason::Threshold, CompactionReason::Overflow);
}
#[test]
fn test_model_cycle_result() {
let result = ModelCycleResult {
provider: "openai".to_string(),
model_id: "gpt-4o".to_string(),
thinking_level: ThinkingLevel::Standard,
is_scoped: false,
};
assert!(!result.is_scoped);
}
#[test]
fn test_session_stats_default() {
let stats = SessionStats {
session_id: "test".to_string(),
user_messages: 0,
assistant_messages: 0,
tool_calls: 0,
tool_results: 0,
total_messages: 0,
tokens: TokenStats::default(),
cost: 0.0,
};
assert_eq!(stats.total_messages, 0);
}
#[test]
fn test_streaming_behavior() {
assert_eq!(StreamingBehavior::Steer, StreamingBehavior::Steer);
assert_ne!(StreamingBehavior::Steer, StreamingBehavior::FollowUp);
}
#[test]
fn test_input_source_default() {
assert_eq!(InputSource::default(), InputSource::Interactive);
}
#[test]
fn test_prompt_options_default() {
let opts = PromptOptions::default();
assert!(opts.expand_templates);
assert!(opts.images.is_empty());
assert!(opts.streaming_behavior.is_none());
assert_eq!(opts.source, InputSource::Interactive);
}
}