mod chat;
mod providers;
mod streaming;
#[cfg(test)]
mod tests;
mod thinking;
pub mod types;
pub use types::{RigAgentLoopResult, RigAgentStatus};
use std::path::PathBuf;
use std::sync::Arc;
use rig::message::Message;
use rustc_hash::FxHashMap;
use crate::ast::AgentParams;
use crate::error::NikaError;
use crate::event::EventLog;
use crate::mcp::McpClient;
use crate::provider::rig::{AgentMediaStaging, NikaMcpTool, NikaMcpToolDef};
use crate::runtime::submit_tool::DynamicSubmitTool;
use crate::runtime::SkillInjector;
use crate::tools::{
EditTool, GlobTool, GrepTool, PermissionMode, ReadTool, ToolContext, WriteTool,
};
pub struct RigAgentLoop {
task_id: String,
params: AgentParams,
event_log: EventLog,
#[allow(dead_code)] mcp_clients: FxHashMap<String, Arc<McpClient>>,
tools: Vec<Arc<dyn rig::tool::ToolDyn>>,
history: Vec<Message>,
turn_count: u32,
stream_tx: Option<tokio::sync::mpsc::Sender<crate::provider::rig::StreamChunk>>,
skill_injector: Option<Arc<SkillInjector>>,
skills_map: Option<std::collections::HashMap<String, String>>,
base_dir: Option<PathBuf>,
pub media_staging: AgentMediaStaging,
}
impl std::fmt::Debug for RigAgentLoop {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RigAgentLoop")
.field("task_id", &self.task_id)
.field("params", &self.params)
.field("tool_count", &self.tools.len())
.field("history_len", &self.history.len())
.field("media_staged", &self.media_staging.len())
.finish_non_exhaustive()
}
}
struct ArcToolAdapter(Arc<dyn rig::tool::ToolDyn>);
impl rig::tool::ToolDyn for ArcToolAdapter {
fn name(&self) -> String {
self.0.name()
}
fn definition<'a>(
&'a self,
prompt: String,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = rig::completion::ToolDefinition> + Send + 'a>,
> {
self.0.definition(prompt)
}
fn call<'a>(
&'a self,
args: String,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<String, rig::tool::ToolError>> + Send + 'a>,
> {
self.0.call(args)
}
}
impl RigAgentLoop {
pub fn new(
task_id: String,
params: AgentParams,
event_log: EventLog,
mcp_clients: FxHashMap<String, Arc<McpClient>>,
) -> Result<Self, NikaError> {
if params.prompt.is_empty() {
return Err(NikaError::AgentValidationError {
reason: format!("Agent prompt cannot be empty (task: {})", task_id),
});
}
if let Some(max_turns) = params.max_turns {
if max_turns == 0 {
return Err(NikaError::AgentValidationError {
reason: format!("max_turns must be at least 1 (task: {})", task_id),
});
}
if max_turns > 100 {
return Err(NikaError::AgentValidationError {
reason: format!("max_turns cannot exceed 100 (task: {})", task_id),
});
}
}
let media_staging: AgentMediaStaging = Arc::new(dashmap::DashMap::new());
let mut tools = Self::build_tools(¶ms.mcp, &mcp_clients, &media_staging)?;
let current_depth = 1_u32;
let max_depth = params.effective_depth_limit();
if current_depth < max_depth {
let spawn_tool = super::spawn::SpawnAgentTool::with_mcp(
current_depth,
max_depth,
Arc::from(task_id.as_str()),
event_log.clone(),
mcp_clients.clone(),
params.mcp.clone(),
);
tools.push(Arc::new(spawn_tool));
}
use super::builtin::{
AssertTool, CompleteTool, EmitTool, LogTool, NikaBuiltinToolAdapter, PromptTool,
RunTool, SleepTool,
};
let event_log_arc = Arc::new(event_log.clone());
let task_id_arc: Arc<str> = task_id.as_str().into();
let all_builtins_requested = params.tools.iter().any(|t| t == "builtin");
let requested_nika_tools: Vec<&str> = params
.tools
.iter()
.filter(|t| t.starts_with("nika:"))
.map(|t| t.as_str())
.collect();
let should_add = |name: &str| -> bool {
if requested_nika_tools.is_empty() {
true } else {
let full_name = format!("nika:{}", name);
requested_nika_tools.contains(&full_name.as_str())
}
};
if should_add("sleep") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(SleepTool))));
}
if should_add("log") {
tools.push(Arc::new(
NikaBuiltinToolAdapter::new(Arc::new(LogTool))
.with_event_log(Arc::clone(&event_log_arc), Arc::clone(&task_id_arc)),
));
}
if should_add("emit") {
tools.push(Arc::new(
NikaBuiltinToolAdapter::new(Arc::new(EmitTool))
.with_event_log(Arc::clone(&event_log_arc), Arc::clone(&task_id_arc)),
));
}
if should_add("assert") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(AssertTool))));
}
if should_add("prompt") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
PromptTool::default(),
))));
}
if should_add("run") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(RunTool))));
}
if should_add("complete") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
CompleteTool,
))));
}
let file_tools_requested: Vec<&str> = requested_nika_tools
.iter()
.filter(|t| {
matches!(
**t,
"nika:read" | "nika:write" | "nika:edit" | "nika:glob" | "nika:grep"
)
})
.copied()
.collect();
if all_builtins_requested || !file_tools_requested.is_empty() {
let working_dir = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
let tool_ctx = Arc::new(ToolContext::new(working_dir, PermissionMode::YoloMode));
use super::builtin::FileToolAdapter;
if all_builtins_requested || file_tools_requested.contains(&"nika:read") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
FileToolAdapter::new(ReadTool::new(Arc::clone(&tool_ctx))),
))));
}
if all_builtins_requested || file_tools_requested.contains(&"nika:write") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
FileToolAdapter::new(WriteTool::new(Arc::clone(&tool_ctx))),
))));
}
if all_builtins_requested || file_tools_requested.contains(&"nika:edit") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
FileToolAdapter::new(EditTool::new(Arc::clone(&tool_ctx))),
))));
}
if all_builtins_requested || file_tools_requested.contains(&"nika:glob") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
FileToolAdapter::new(GlobTool::new(Arc::clone(&tool_ctx))),
))));
}
if all_builtins_requested || file_tools_requested.contains(&"nika:grep") {
tools.push(Arc::new(NikaBuiltinToolAdapter::new(Arc::new(
FileToolAdapter::new(GrepTool::new(tool_ctx)),
))));
}
}
let history_capacity = params.max_turns.unwrap_or(10) as usize * 2;
Ok(Self {
task_id,
params,
event_log,
mcp_clients,
tools,
history: Vec::with_capacity(history_capacity),
turn_count: 0,
stream_tx: None,
skill_injector: None,
skills_map: None,
base_dir: None,
media_staging,
})
}
pub fn with_stream_tx(
mut self,
tx: tokio::sync::mpsc::Sender<crate::provider::rig::StreamChunk>,
) -> Self {
self.stream_tx = Some(tx);
self
}
pub fn with_skills(
mut self,
injector: Arc<SkillInjector>,
skills_map: std::collections::HashMap<String, String>,
base_dir: PathBuf,
) -> Self {
self.skill_injector = Some(injector);
self.skills_map = Some(skills_map);
self.base_dir = Some(base_dir);
self
}
pub fn with_structured_output(mut self, schema: serde_json::Value) -> Self {
let schema = if schema.get("type").is_none() {
tracing::warn!(
task_id = %self.task_id,
"output.schema missing 'type' field, wrapping in object schema"
);
serde_json::json!({
"type": "object",
"properties": {
"result": schema
},
"required": ["result"]
})
} else {
schema
};
let submit_tool = DynamicSubmitTool::new(schema);
self.tools.push(Arc::new(submit_tool));
tracing::debug!(
task_id = %self.task_id,
"Added DynamicSubmitTool (submit_result) to agent tools"
);
self
}
async fn inject_skills_into_prompt(&self) -> Result<String, NikaError> {
let mut preamble = self
.params
.system
.as_deref()
.unwrap_or_default()
.to_string();
if !self.tools.is_empty() {
let tool_names: Vec<String> = self
.tools
.iter()
.filter_map(|t| {
let name = t.name();
if name.starts_with("nika_") {
Some(name)
} else {
None
}
})
.collect();
if !tool_names.is_empty() {
preamble.push_str("\n\n## Available Tools\n");
for name in &tool_names {
match name.as_str() {
"nika_read" => {
preamble.push_str("- nika_read: Read file contents from disk\n")
}
"nika_write" => {
preamble.push_str("- nika_write: Create a NEW file (fails if exists)\n")
}
"nika_edit" => preamble
.push_str("- nika_edit: Edit an EXISTING file by replacing text\n"),
"nika_glob" => {
preamble.push_str("- nika_glob: Find files matching a pattern\n")
}
"nika_grep" => {
preamble.push_str("- nika_grep: Search file contents with regex\n")
}
"nika_complete" => preamble.push_str(
"- nika_complete: Signal task completion with structured result\n",
),
"nika_log" => preamble.push_str(
"- nika_log: Emit a log message (for observability only, not output)\n",
),
"nika_emit" => {
preamble.push_str("- nika_emit: Emit a named event with payload\n")
}
"nika_run" => preamble.push_str("- nika_run: Execute a sub-workflow\n"),
_ => {}
}
}
preamble.push_str(
"\nUse the MOST SPECIFIC tool for each action. Call nika_complete when done.\n",
);
}
}
let (Some(injector), Some(skills_map), Some(base_dir)) =
(&self.skill_injector, &self.skills_map, &self.base_dir)
else {
return Ok(preamble);
};
let Some(skill_names) = &self.params.skills else {
return Ok(preamble);
};
if skill_names.is_empty() {
return Ok(preamble);
}
let skill_refs: Vec<&str> = skill_names.iter().map(|s| s.as_str()).collect();
let preamble_ref = if preamble.is_empty() {
None
} else {
Some(preamble.as_str())
};
injector
.inject(preamble_ref, &skill_refs, skills_map, base_dir)
.await
}
fn tools_as_boxed(&self) -> Vec<Box<dyn rig::tool::ToolDyn>> {
self.tools
.iter()
.map(|t| Box::new(ArcToolAdapter(Arc::clone(t))) as Box<dyn rig::tool::ToolDyn>)
.collect()
}
pub fn drain_media(&self) -> Vec<crate::mcp::types::ContentBlock> {
let mut all_blocks = Vec::new();
for entry in self.media_staging.iter() {
all_blocks.extend(entry.value().iter().cloned());
}
self.media_staging.clear();
all_blocks
}
fn build_tools(
mcp_names: &[String],
mcp_clients: &FxHashMap<String, Arc<McpClient>>,
media_staging: &AgentMediaStaging,
) -> Result<Vec<Arc<dyn rig::tool::ToolDyn>>, NikaError> {
let mut tools: Vec<Arc<dyn rig::tool::ToolDyn>> = Vec::new();
for mcp_name in mcp_names {
let client = mcp_clients
.get(mcp_name)
.ok_or_else(|| NikaError::McpNotConnected {
name: mcp_name.clone(),
})?;
let tool_defs = client.get_tool_definitions();
for def in tool_defs {
let tool = NikaMcpTool::with_media_staging(
NikaMcpToolDef {
name: def.name.clone(),
description: def.description.clone().unwrap_or_default(),
input_schema: def
.input_schema
.clone()
.unwrap_or_else(|| serde_json::json!({"type": "object"})),
},
client.clone(),
Arc::clone(media_staging),
);
tools.push(Arc::new(tool));
}
}
Ok(tools)
}
pub fn tool_count(&self) -> usize {
self.tools.len()
}
}