use std::time::Duration;
use futures::StreamExt as _;
use tokio::sync::mpsc;
use tracing::warn;
use crate::{
agent::LiveStatus,
channel::OutboundMessage,
config::runtime::RuntimeConfig,
provider::{
LlmRequest, Message, MessageContent, Role, StreamEvent,
failover::FailoverManager,
registry::ProviderRegistry,
},
};
pub(crate) async fn try_preparse_locally(
text: &str,
handle: &crate::agent::AgentHandle,
) -> Option<OutboundMessage> {
use std::sync::atomic::Ordering;
let t = text.trim();
let lower = t.to_lowercase();
let txt = |s: String| OutboundMessage {
target_id: String::new(),
is_group: false,
text: s,
reply_to: None,
images: vec![],
files: vec![],
channel: None,
};
let workspace = || {
let base = crate::config::loader::base_dir();
handle.config.workspace.as_deref()
.map(std::path::PathBuf::from)
.unwrap_or_else(|| base.join("workspace"))
};
if lower == "/version" {
return Some(txt(format!("rsclaw v{}", option_env!("RSCLAW_BUILD_VERSION").unwrap_or("dev"))));
}
if lower == "/health" {
let secs = handle.started_at.elapsed().as_secs();
let uptime = if secs < 60 { format!("{secs}s") }
else if secs < 3600 { format!("{}m {}s", secs/60, secs%60) }
else { format!("{}h {}m", secs/3600, (secs%3600)/60) };
return Some(txt(format!("OK ยท up {uptime}")));
}
if lower == "/uptime" {
let secs = handle.started_at.elapsed().as_secs();
let s = if secs < 60 { format!("{secs}s") }
else if secs < 3600 { format!("{}m {}s", secs/60, secs%60) }
else { format!("{}h {}m", secs/3600, (secs%3600)/60) };
return Some(txt(s));
}
if lower == "/abort" {
let flags = handle.abort_flags.read().expect("abort_flags lock poisoned");
let count = flags.len();
for f in flags.values() { f.store(true, Ordering::SeqCst); }
return Some(txt(if count > 0 { format!("abort signal sent ({count} session(s))") } else { "nothing to abort".to_owned() }));
}
if lower == "/clear" {
let flags = handle.abort_flags.read().expect("abort_flags lock poisoned");
for f in flags.values() { f.store(true, Ordering::SeqCst); }
drop(flags);
handle.clear_signal.store(true, Ordering::SeqCst);
return Some(txt("Session cleared.".to_owned()));
}
if lower == "/new" {
let flags = handle.abort_flags.read().expect("abort_flags lock poisoned");
for f in flags.values() { f.store(true, Ordering::SeqCst); }
drop(flags);
handle.new_session_signal.store(true, Ordering::SeqCst);
return Some(txt("New session started.".to_owned()));
}
if lower == "/reset" {
let flags = handle.abort_flags.read().expect("abort_flags lock poisoned");
for f in flags.values() { f.store(true, Ordering::SeqCst); }
drop(flags);
handle.reset_signal.store(true, Ordering::SeqCst);
return Some(txt("Session reset.".to_owned()));
}
if lower == "/status" {
return Some(txt(handle.format_status()));
}
if lower == "/ls" || lower.starts_with("/ls ") {
let path_arg = t.get(3..).unwrap_or("").trim();
let ws = workspace();
let target = if path_arg.is_empty() {
ws
} else {
let p = std::path::PathBuf::from(path_arg);
if p.is_absolute() { p } else { ws.join(path_arg) }
};
let out = tokio::process::Command::new("ls")
.current_dir(&target)
.output()
.await
.ok()?;
let stdout = String::from_utf8_lossy(&out.stdout).into_owned();
return Some(txt(if stdout.trim().is_empty() { "(empty directory)".to_owned() } else { stdout }));
}
if lower.starts_with("/cat ") {
let path_arg = t.get(5..).unwrap_or("").trim();
let ws = workspace();
let target = {
let p = std::path::PathBuf::from(path_arg);
if p.is_absolute() { p } else { ws.join(path_arg) }
};
let content = tokio::fs::read_to_string(&target).await
.unwrap_or_else(|e| format!("error reading {}: {e}", target.display()));
return Some(txt(content));
}
if lower == "/ss" || lower == "/screenshot" {
let tmp_path = std::env::temp_dir().join("rsclaw_screen.png");
let tmp_s = tmp_path.to_string_lossy().to_string();
let ok = if cfg!(target_os = "macos") {
tokio::process::Command::new("screencapture")
.args(["-x", &tmp_s]).status().await.map(|s| s.success()).unwrap_or(false)
} else if cfg!(target_os = "windows") {
let script = format!(
r#"Add-Type -AssemblyName System.Windows.Forms,System.Drawing
$b=New-Object System.Drawing.Bitmap([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Width,[System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Height)
$g=[System.Drawing.Graphics]::FromImage($b)
$g.CopyFromScreen(0,0,0,0,$b.Size)
$b.Save('{tmp_s}')
$g.Dispose();$b.Dispose()"#
);
{
#[cfg(target_os = "windows")]
let mut cmd = {
use std::os::windows::process::CommandExt;
let mut c = tokio::process::Command::new("powershell");
c.creation_flags(0x08000000);
c
};
#[cfg(not(target_os = "windows"))]
let mut cmd = tokio::process::Command::new("powershell");
cmd.args(["-NoProfile", "-WindowStyle", "Hidden", "-Command", &script])
.status().await.map(|s| s.success()).unwrap_or(false)
}
} else {
tokio::process::Command::new("scrot")
.arg(&tmp_s).status().await.map(|s| s.success()).unwrap_or(false)
};
if ok {
if let Ok(bytes) = tokio::fs::read(&tmp_path).await {
use base64::Engine;
let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
return Some(OutboundMessage {
target_id: String::new(),
is_group: false,
text: String::new(),
reply_to: None,
images: vec![format!("data:image/png;base64,{b64}")],
files: vec![],
channel: None,
});
}
}
return Some(txt("screenshot failed".to_owned()));
}
if lower == "/skill list" {
let base = crate::config::loader::base_dir();
let global_dir = base.join("skills");
let ws_dir = workspace().join("skills");
let scan = |dir: &std::path::Path| -> Vec<String> {
let mut names = Vec::new();
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let p = entry.path();
if p.is_dir() && p.join("SKILL.md").exists() {
if let Some(name) = p.file_name().and_then(|n| n.to_str()) {
names.push(name.to_owned());
}
}
}
}
names.sort();
names
};
let global = scan(&global_dir);
let agent = scan(&ws_dir);
let mut lines = Vec::new();
lines.push(format!("System skills ({}):", global.len()));
if global.is_empty() {
lines.push(" (none)".to_owned());
} else {
for s in &global { lines.push(format!(" {s}")); }
}
lines.push(format!("Agent skills ({}):", agent.len()));
if agent.is_empty() {
lines.push(" (none)".to_owned());
} else {
for s in &agent { lines.push(format!(" {s}")); }
}
return Some(txt(lines.join("\n")));
}
if lower == "/cron" || lower == "/cron list" {
let jobs_path = crate::config::loader::base_dir().join("cron.json5");
let reply = match tokio::fs::read_to_string(&jobs_path).await {
Ok(content) => {
let parsed: Option<Vec<serde_json::Value>> = json5::from_str::<serde_json::Value>(&content)
.or_else(|_| serde_json::from_str(&content))
.ok()
.and_then(|v| {
v.get("jobs").and_then(|j| j.as_array().cloned())
.or_else(|| v.as_array().cloned())
});
match parsed {
Some(jobs) if jobs.is_empty() => "No cron jobs configured.".to_owned(),
Some(jobs) => {
let mut lines = vec!["Cron jobs:".to_owned()];
for job in &jobs {
let id = job.get("id").and_then(|v| v.as_str()).unwrap_or("?");
let schedule = job.get("schedule").and_then(|v| v.as_str()).unwrap_or("?");
let agent = job.get("agentId").and_then(|v| v.as_str()).unwrap_or("main");
let msg = job.get("message").and_then(|v| v.as_str()).unwrap_or("");
let enabled = job.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true);
let status = if enabled { "" } else { " (disabled)" };
let msg_preview = if msg.len() > 50 {
let end = msg.char_indices().nth(47).map(|(i, _)| i).unwrap_or(msg.len());
format!("{}...", &msg[..end])
} else {
msg.to_owned()
};
lines.push(format!(" [{}] {} -> {} \"{}\"{}",
id, schedule, agent, msg_preview, status));
}
lines.join("\n")
}
None => "No cron jobs configured.".to_owned(),
}
}
Err(_) => "No cron jobs configured.".to_owned(),
};
return Some(txt(reply));
}
if lower == "/model" || lower == "/models" {
let model = handle.config.model.as_ref()
.and_then(|m| m.primary.as_deref())
.unwrap_or("default");
let mut lines = vec![format!("Current model: {model}")];
lines.push(String::new());
lines.push("Registered providers:".to_owned());
for name in handle.providers.names() {
lines.push(format!(" {name}"));
}
return Some(txt(lines.join("\n")));
}
if lower.starts_with("/model ") {
let model = t.get(7..).unwrap_or("").trim();
return Some(txt(format!("Model switched to: {model} (runtime only, use configure to persist)")));
}
let shell_cmd: Option<&str> = if lower.starts_with("/run ")
|| lower.starts_with("/sh ")
|| lower.starts_with("/exec ")
{
t.find(' ').map(|i| t[i + 1..].trim())
} else if t.starts_with("! ") {
Some(t[2..].trim())
} else if t.starts_with("$ ") {
Some(t[2..].trim())
} else {
None
};
if let Some(cmd) = shell_cmd {
tracing::warn!(command = %cmd, "executing shell command via preparse (open dmPolicy)");
let (shell, arg) = if cfg!(target_os = "windows") {
("powershell", "-Command")
} else {
("sh", "-c")
};
let ws = workspace();
let mut proc = tokio::process::Command::new(shell);
proc.args([arg, cmd])
.current_dir(&ws)
.stdin(std::process::Stdio::null())
.kill_on_drop(true);
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
const CREATE_NO_WINDOW: u32 = 0x08000000;
proc.creation_flags(CREATE_NO_WINDOW);
}
let out = proc.output().await;
let reply = match out {
Ok(o) => {
let mut result = String::from_utf8_lossy(&o.stdout).into_owned();
let stderr = String::from_utf8_lossy(&o.stderr);
if !stderr.trim().is_empty() {
if !result.is_empty() { result.push('\n'); }
result.push_str(stderr.trim());
}
if result.trim().is_empty() {
if o.status.success() { "(no output)".to_owned() }
else { format!("exit {}", o.status.code().unwrap_or(-1)) }
} else { result }
}
Err(e) => format!("exec error: {e}"),
};
return Some(txt(reply));
}
None
}
pub(crate) fn is_fast_preparse(text: &str) -> bool {
let t = text.trim();
let lower = t.to_lowercase();
matches!(
lower.as_str(),
"/ls" | "/status" | "/version" | "/help" | "/?" | "/health" | "/uptime"
| "/model" | "/models" | "/cron" | "/clear" | "/new" | "/reset" | "/abort" | "/sessions"
)
|| lower.starts_with("/ls ")
|| lower.starts_with("/cat ")
|| lower.starts_with("/ss")
|| lower.starts_with("/remember ")
|| lower.starts_with("/recall ")
|| lower.starts_with("/cron ")
|| lower.starts_with("/skill ")
|| lower.starts_with("/model ")
|| lower.starts_with("/run ")
|| lower.starts_with("/sh ")
|| lower.starts_with("/exec ")
|| t.starts_with("! ")
|| t.starts_with("$ ")
}
pub(crate) fn processing_timeout(config: &RuntimeConfig) -> Duration {
let intermediate = config.raw.agents.as_ref()
.and_then(|a| a.defaults.as_ref())
.and_then(|d| d.intermediate_output)
.unwrap_or(true);
if intermediate {
return Duration::from_secs(86400);
}
let secs = config
.raw
.gateway
.as_ref()
.and_then(|g| g.processing_timeout)
.unwrap_or(120);
if secs == 0 {
Duration::from_secs(86400)
} else {
Duration::from_secs(secs)
}
}
pub(crate) async fn send_processing(
tx: &mpsc::Sender<OutboundMessage>,
target_id: String,
is_group: bool,
config: &RuntimeConfig,
) {
let i18n_lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(crate::i18n::resolve_lang)
.unwrap_or("en");
let text = crate::i18n::t("processing", i18n_lang);
let _ = tokio::time::timeout(
Duration::from_secs(3),
tx.send(OutboundMessage {
target_id,
is_group,
text,
reply_to: None,
images: vec![],
channel: None,
files: vec![], }),
)
.await;
}
pub(crate) async fn btw_direct_call(
question: &str,
live_status: &tokio::sync::RwLock<LiveStatus>,
providers: &ProviderRegistry,
config: &RuntimeConfig,
) -> Option<String> {
let status_block = {
let status = live_status.read().await;
if status.state.is_empty() || status.state == "idle" {
String::new()
} else {
let elapsed = status
.started_at
.map(|s| s.elapsed().as_secs())
.unwrap_or(0);
format!(
"\n<main_agent_status>\nState: {}\nTask: {}\nElapsed: {}s\nRecent tools: {}\nResponse preview: {}\n</main_agent_status>",
status.state,
status.current_task,
elapsed,
status.tool_history.join(", "),
status.text_preview,
)
}
};
let model = config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.primary.as_deref())
.unwrap_or("anthropic/claude-sonnet-4-6");
let system = format!(
"You are answering a quick side question (/btw). Be concise and direct. \
You have no tools available. Answer from your general knowledge only.{}",
status_block
);
let req = LlmRequest {
model: model.to_owned(),
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(question.to_owned()),
}],
tools: vec![],
system: Some(system),
max_tokens: Some(500),
temperature: None,
frequency_penalty: None,
thinking_budget: None, kv_cache_mode: 0, session_key: None,
};
let auth_order = config
.model
.auth
.as_ref()
.and_then(|a| a.order.clone())
.unwrap_or_default();
let mut failover = FailoverManager::new(auth_order, std::collections::HashMap::new(), vec![]);
let mut stream = match failover.call(req, providers).await {
Ok(s) => s,
Err(e) => {
warn!("/btw direct LLM call failed: {e:#}");
return None;
}
};
let mut text_buf = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => text_buf.push_str(&d),
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
Ok(_) => {}
Err(e) => {
warn!("/btw stream error: {e:#}");
break;
}
}
}
if text_buf.is_empty() {
None
} else {
let cleaned = crate::provider::openai::strip_think_tags_pub(&text_buf);
Some(cleaned)
}
}