1use crate::cli_runtime::{
2 build_provider, ensure_workspace_templates, session_channel_and_chat_id, CliRuntime,
3};
4use crate::client::ApiClient;
5use agent_diva_agent::{
6 agent_loop::SoulGovernanceSettings,
7 context::SoulContextSettings,
8 runtime_control::RuntimeControlCommand,
9 tool_config::network::{
10 NetworkToolConfig, WebFetchRuntimeConfig, WebRuntimeConfig, WebSearchRuntimeConfig,
11 },
12 AgentEvent, AgentLoop, ToolConfig,
13};
14use agent_diva_core::bus::MessageBus;
15use agent_diva_core::config::Config;
16use agent_diva_core::cron::CronService;
17use anyhow::Result;
18use console::style;
19use dialoguer::Input;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23fn render_assistant_response(response: &str, _markdown: bool, show_header: bool) {
24 if show_header {
25 println!("\n{}", style("Response:").bold());
26 }
27 println!("{}", response);
28}
29
30pub fn build_network_tool_config(config: &Config) -> NetworkToolConfig {
31 let api_key = config.tools.web.search.api_key.trim().to_string();
32 NetworkToolConfig {
33 web: WebRuntimeConfig {
34 search: WebSearchRuntimeConfig {
35 provider: config.tools.web.search.provider.clone(),
36 enabled: config.tools.web.search.enabled,
37 api_key: if api_key.is_empty() {
38 None
39 } else {
40 Some(api_key)
41 },
42 max_results: config.tools.web.search.max_results,
43 },
44 fetch: WebFetchRuntimeConfig {
45 enabled: config.tools.web.fetch.enabled,
46 },
47 },
48 }
49}
50
51fn build_local_cli_agent(
52 runtime: &CliRuntime,
53 model: Option<String>,
54 with_runtime_control: bool,
55) -> Result<(
56 Config,
57 String,
58 AgentLoop,
59 Option<mpsc::UnboundedSender<RuntimeControlCommand>>,
60)> {
61 let config = runtime.load_config()?;
62 let selected_model = model.unwrap_or_else(|| config.agents.defaults.model.clone());
63 let workspace = runtime.effective_workspace(&config);
64 let _ = ensure_workspace_templates(&workspace)?;
65
66 let bus = MessageBus::new();
67 let provider = Arc::new(build_provider(&config, &selected_model)?);
68 let tool_config = ToolConfig {
69 network: build_network_tool_config(&config),
70 exec_timeout: config.tools.exec.timeout,
71 restrict_to_workspace: config.tools.restrict_to_workspace,
72 mcp_servers: config.tools.active_mcp_servers(),
73 cron_service: Some(Arc::new(CronService::new(runtime.cron_store_path(), None))),
74 soul_context: SoulContextSettings {
75 enabled: config.agents.soul.enabled,
76 max_chars: config.agents.soul.max_chars,
77 bootstrap_once: config.agents.soul.bootstrap_once,
78 },
79 notify_on_soul_change: config.agents.soul.notify_on_change,
80 soul_governance: SoulGovernanceSettings {
81 frequent_change_window_secs: config.agents.soul.frequent_change_window_secs,
82 frequent_change_threshold: config.agents.soul.frequent_change_threshold,
83 boundary_confirmation_hint: config.agents.soul.boundary_confirmation_hint,
84 },
85 };
86
87 let (runtime_control_tx, runtime_control_rx) = if with_runtime_control {
88 let (tx, rx) = mpsc::unbounded_channel();
89 (Some(tx), Some(rx))
90 } else {
91 (None, None)
92 };
93
94 let agent = AgentLoop::with_tools(
95 bus,
96 provider,
97 workspace,
98 Some(selected_model.clone()),
99 Some(config.agents.defaults.max_tool_iterations as usize),
100 tool_config,
101 runtime_control_rx,
102 );
103
104 Ok((config, selected_model, agent, runtime_control_tx))
105}
106
107async fn run_local_agent_turn(
108 agent: &mut AgentLoop,
109 message: &str,
110 session_key: &str,
111 markdown: bool,
112 logs: bool,
113 show_response_header: bool,
114) -> Result<()> {
115 let (channel, chat_id) = session_channel_and_chat_id(session_key);
116 if !logs {
117 let response = agent
118 .process_direct(message, session_key, channel, chat_id)
119 .await
120 .map_err(|err| anyhow::anyhow!("Failed to process message: {}", err))?;
121 render_assistant_response(&response, markdown, show_response_header);
122 return Ok(());
123 }
124
125 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<AgentEvent>();
126 let mut response_fut = std::pin::pin!(agent.process_direct_stream(
127 message.to_string(),
128 session_key.to_string(),
129 channel.to_string(),
130 chat_id.to_string(),
131 event_tx,
132 ));
133 let mut completed = false;
134 let mut final_response = String::new();
135
136 loop {
137 tokio::select! {
138 result = &mut response_fut, if !completed => {
139 completed = true;
140 match result {
141 Ok(response) if final_response.is_empty() => final_response = response,
142 Ok(_) => {}
143 Err(err) => anyhow::bail!("Failed to process message: {}", err),
144 }
145 }
146 event = event_rx.recv() => {
147 match event {
148 Some(AgentEvent::AssistantDelta { text }) => {
149 print!("{}", text);
150 final_response.push_str(&text);
151 use std::io::Write;
152 let _ = std::io::stdout().flush();
153 }
154 Some(AgentEvent::ReasoningDelta { text }) => {
155 print!("{}", style(text).dim());
156 use std::io::Write;
157 let _ = std::io::stdout().flush();
158 }
159 Some(AgentEvent::ToolCallStarted { name, args_preview, .. }) => {
160 println!("\n{}", style(format!("[tool:start] {} {}", name, args_preview)).yellow());
161 }
162 Some(AgentEvent::ToolCallFinished { name, result, is_error, .. }) => {
163 let prefix = if is_error { "[tool:error]" } else { "[tool:done]" };
164 println!("\n{}", style(format!("{} {} {}", prefix, name, result)).yellow());
165 }
166 Some(AgentEvent::FinalResponse { content }) => {
167 if final_response.is_empty() {
168 final_response = content;
169 }
170 println!();
171 }
172 Some(AgentEvent::Error { message }) => anyhow::bail!("Failed to process message: {}", message),
173 Some(_) => {}
174 None if completed => break,
175 None => {}
176 }
177 }
178 }
179 }
180
181 if !final_response.is_empty() {
182 render_assistant_response(&final_response, markdown, show_response_header);
183 }
184
185 Ok(())
186}
187
188pub async fn run_agent(
189 runtime: &CliRuntime,
190 message: &str,
191 model: Option<String>,
192 session: Option<String>,
193 markdown: bool,
194 logs: bool,
195) -> Result<()> {
196 let (_config, _selected_model, mut agent, _runtime_control_tx) =
197 build_local_cli_agent(runtime, model, false)?;
198
199 let session_key = session.unwrap_or_else(|| "cli:direct".to_string());
200
201 if logs {
202 println!("{}", style("Processing...").cyan());
203 }
204 run_local_agent_turn(&mut agent, message, &session_key, markdown, logs, true).await
205}
206
207async fn run_remote_agent_turn(
208 client: &ApiClient,
209 message: &str,
210 session_key: &str,
211 markdown: bool,
212 logs: bool,
213 show_response_header: bool,
214) -> Result<()> {
215 let (channel, chat_id) = session_channel_and_chat_id(session_key);
216 let (event_tx, mut event_rx) = mpsc::unbounded_channel::<AgentEvent>();
217 let mut chat_fut = std::pin::pin!(client.chat_with_target(
218 message.to_string(),
219 Some(channel),
220 Some(chat_id),
221 event_tx,
222 ));
223 let mut completed = false;
224 let mut final_response = String::new();
225
226 loop {
227 tokio::select! {
228 result = &mut chat_fut, if !completed => {
229 completed = true;
230 result?;
231 }
232 event = event_rx.recv() => {
233 match event {
234 Some(AgentEvent::AssistantDelta { text }) => {
235 if logs {
236 print!("{}", text);
237 use std::io::Write;
238 let _ = std::io::stdout().flush();
239 }
240 final_response.push_str(&text);
241 }
242 Some(AgentEvent::ReasoningDelta { text }) => {
243 if logs {
244 print!("{}", style(text).dim());
245 use std::io::Write;
246 let _ = std::io::stdout().flush();
247 }
248 }
249 Some(AgentEvent::ToolCallStarted { name, args_preview, .. }) if logs => {
250 println!("\n{}", style(format!("[tool:start] {} {}", name, args_preview)).yellow());
251 }
252 Some(AgentEvent::ToolCallFinished { name, result, is_error, .. }) if logs => {
253 let prefix = if is_error { "[tool:error]" } else { "[tool:done]" };
254 println!("\n{}", style(format!("{} {} {}", prefix, name, result)).yellow());
255 }
256 Some(AgentEvent::FinalResponse { content }) => {
257 if final_response.is_empty() {
258 final_response = content;
259 }
260 if logs {
261 println!();
262 }
263 }
264 Some(AgentEvent::Error { message }) => anyhow::bail!("Remote error: {}", message),
265 Some(_) => {}
266 None if completed => break,
267 None => {}
268 }
269 }
270 }
271 }
272
273 if !final_response.is_empty() {
274 render_assistant_response(&final_response, markdown, show_response_header);
275 }
276
277 Ok(())
278}
279
280pub async fn run_agent_remote(
281 message: &str,
282 session: Option<String>,
283 markdown: bool,
284 logs: bool,
285 api_url: Option<String>,
286) -> Result<()> {
287 let client = ApiClient::new(api_url);
288 let session_key = session.unwrap_or_else(|| "cli:direct:remote".to_string());
289 if logs {
290 println!("{}", style("Processing (remote)...").cyan());
291 }
292 run_remote_agent_turn(&client, message, &session_key, markdown, logs, true).await
293}
294
295pub async fn run_chat(
296 runtime: &CliRuntime,
297 model: Option<String>,
298 session: Option<String>,
299 markdown: bool,
300 logs: bool,
301) -> Result<()> {
302 let (_config, selected_model, mut agent, runtime_control_tx) =
303 build_local_cli_agent(runtime, model, true)?;
304 let mut current_session = session.unwrap_or_else(|| "cli:chat".to_string());
305
306 println!("{}", style("Agent Diva Chat").bold().cyan());
307 println!(" model: {}", selected_model);
308 println!(" session: {}", current_session);
309 println!(" commands: /quit /clear /new /stop");
310
311 loop {
312 let input: String = Input::new()
313 .with_prompt("You")
314 .allow_empty(false)
315 .interact_text()?;
316 let command = input.trim();
317 if command.is_empty() {
318 continue;
319 }
320
321 match command {
322 "/quit" => break,
323 "/clear" => {
324 print!("\x1B[2J\x1B[H");
325 continue;
326 }
327 "/new" => {
328 current_session =
329 format!("cli:chat:{}", chrono::Local::now().format("%Y%m%d%H%M%S"));
330 println!("session -> {}", current_session);
331 continue;
332 }
333 "/stop" => {
334 if let Some(tx) = &runtime_control_tx {
335 let _ = tx.send(RuntimeControlCommand::StopSession {
336 session_key: current_session.clone(),
337 });
338 println!("{}", style("stop requested").yellow());
339 }
340 continue;
341 }
342 _ => {}
343 }
344
345 run_local_agent_turn(&mut agent, command, ¤t_session, markdown, logs, true).await?;
346 }
347
348 Ok(())
349}
350
351pub async fn run_chat_remote(
352 _model: Option<String>,
353 session: Option<String>,
354 markdown: bool,
355 logs: bool,
356 api_url: Option<String>,
357) -> Result<()> {
358 let client = ApiClient::new(api_url);
359 let mut current_session = session.unwrap_or_else(|| "cli:chat:remote".to_string());
360
361 println!("{}", style("Agent Diva Chat (remote)").bold().cyan());
362 println!(" session: {}", current_session);
363 println!(" commands: /quit /clear /new /stop");
364
365 loop {
366 let input: String = Input::new()
367 .with_prompt("You")
368 .allow_empty(false)
369 .interact_text()?;
370 let command = input.trim();
371 if command.is_empty() {
372 continue;
373 }
374
375 match command {
376 "/quit" => break,
377 "/clear" => {
378 print!("\x1B[2J\x1B[H");
379 continue;
380 }
381 "/new" => {
382 current_session = format!(
383 "cli:chat:remote:{}",
384 chrono::Local::now().format("%Y%m%d%H%M%S")
385 );
386 println!("session -> {}", current_session);
387 continue;
388 }
389 "/stop" => {
390 let (channel, chat_id) = session_channel_and_chat_id(¤t_session);
391 let stopped = client.stop(Some(channel), Some(chat_id)).await?;
392 println!(
393 "{}",
394 if stopped {
395 style("stop requested").yellow()
396 } else {
397 style("no running task for session").dim()
398 }
399 );
400 continue;
401 }
402 _ => {}
403 }
404
405 run_remote_agent_turn(&client, command, ¤t_session, markdown, logs, true).await?;
406 }
407
408 Ok(())
409}