use rig::completion::Message;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::event::AgentEvent;
use crate::session::{MessageRole, Session};
pub struct AgentRunner {
pub event_rx: mpsc::Receiver<AgentEvent>,
pub task: JoinHandle<()>,
pub interject_tx: mpsc::Sender<()>,
pub cancel_tx: mpsc::Sender<()>,
}
impl AgentRunner {
pub(crate) fn install_into(
self,
rx: &mut Option<mpsc::Receiver<AgentEvent>>,
abort: &mut Option<JoinHandle<()>>,
interject: &mut Option<mpsc::Sender<()>>,
cancel: &mut Option<mpsc::Sender<()>>,
is_running: &mut bool,
) {
*rx = Some(self.event_rx);
*abort = Some(self.task);
*interject = Some(self.interject_tx);
*cancel = Some(self.cancel_tx);
*is_running = true;
}
}
pub(crate) struct AbortRunnerOnDrop {
pub task: JoinHandle<()>,
pub cancel_tx: mpsc::Sender<()>,
}
impl Drop for AbortRunnerOnDrop {
fn drop(&mut self) {
let _ = self.cancel_tx.try_send(());
self.task.abort();
}
}
pub(crate) fn summarize_actions(actions: &[String]) -> String {
actions
.iter()
.fold(Vec::<&str>::new(), |mut acc, a| {
if !acc.contains(&a.as_str()) {
acc.push(a.as_str());
}
acc
})
.join(" · ")
}
pub fn convert_history(session: &Session) -> Vec<Message> {
use rig::OneOrMany;
use rig::completion::message::AssistantContent;
let (summary, first_kept) = session.compacted_context();
let mut messages = Vec::new();
if let Some(summary) = summary {
messages.push(Message::system(format!(
"[Previous conversation summary]\n{}",
summary
)));
}
for msg in &session.messages[first_kept..] {
match msg.role {
MessageRole::User => messages.push(Message::user(msg.content.to_string())),
MessageRole::System => messages.push(Message::system(msg.content.to_string())),
MessageRole::Assistant => {
if msg.tool_calls.is_empty() {
messages.push(Message::assistant(msg.content.to_string()));
continue;
}
let mut parts: Vec<AssistantContent> = Vec::new();
if !msg.content.is_empty() {
parts.push(AssistantContent::text(msg.content.to_string()));
}
for tc in &msg.tool_calls {
parts.push(AssistantContent::tool_call(
tc.id.clone(),
tc.name.clone(),
tc.args.clone(),
));
}
let content = if parts.len() == 1 {
OneOrMany::one(parts.pop().unwrap())
} else {
OneOrMany::many(parts).expect("non-empty parts vec")
};
messages.push(Message::Assistant { id: None, content });
for tc in &msg.tool_calls {
let body = match &tc.state {
crate::session::ToolCallState::Completed { result } => result.clone(),
crate::session::ToolCallState::Interrupted => {
"[Tool execution was interrupted]".to_string()
}
crate::session::ToolCallState::Failed { error } => {
format!("[Tool error: {}]", error)
}
};
messages.push(Message::tool_result(tc.id.clone(), body));
}
}
}
}
messages
}
pub(crate) fn emit_stream_json_event(value: serde_json::Value) {
if let Ok(s) = serde_json::to_string(&value) {
println!("{}", s);
let _ = std::io::Write::flush(&mut std::io::stdout());
}
}
pub(crate) fn uuid_v4_simple() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let pid = std::process::id() as u64;
let mut state = nanos.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(pid);
let mut bytes = [0u8; 16];
for chunk in bytes.chunks_mut(8) {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let words = state.to_le_bytes();
chunk.copy_from_slice(&words[..chunk.len()]);
}
bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
format!(
"{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
bytes[0],
bytes[1],
bytes[2],
bytes[3],
bytes[4],
bytes[5],
bytes[6],
bytes[7],
bytes[8],
bytes[9],
bytes[10],
bytes[11],
bytes[12],
bytes[13],
bytes[14],
bytes[15],
)
}
#[cfg(feature = "plugin")]
#[derive(Debug, Default, PartialEq, Eq)]
pub(crate) struct ResponseHookResult {
pub replacement: Option<String>,
}
#[cfg(feature = "plugin")]
pub(crate) fn resolve_prompt_with_hooks(
prompt: &str,
mgr: &mut crate::plugin::PluginManager,
) -> String {
let janet_ctx = format!(
"@{{:prompt \"{}\"}}",
crate::plugin::escape_janet_string(prompt)
);
let mut hint: Option<String> = match mgr.dispatch("on-prompt", &janet_ctx) {
Ok(results) if !results.is_empty() => Some(results.join("\n")),
Ok(_) => None,
Err(e) => {
eprintln!("[plugin] on-prompt error: {e}");
None
}
};
if let Some(pending) = mgr.take_pending_prompt() {
hint = Some(pending);
}
let replace = mgr.take_pending_prompt_replace();
if let Some(rep) = replace {
rep
} else if let Some(h) = hint {
format!("{}\n\n{}", h, prompt)
} else {
prompt.to_string()
}
}
#[cfg(feature = "plugin")]
pub(crate) fn apply_response_hooks(
response: &str,
mgr: &mut crate::plugin::PluginManager,
) -> ResponseHookResult {
let janet_ctx = format!(
"@{{:response \"{}\"}}",
crate::plugin::escape_janet_string(response)
);
if let Err(e) = mgr.dispatch("on-response", &janet_ctx) {
eprintln!("[plugin] on-response error: {e}");
}
let mut stored = response.to_string();
match mgr.dispatch(
"message-end",
&format!(
"@{{:message \"{}\"}}",
crate::plugin::escape_janet_string(response)
),
) {
Ok(_) => {
if let Some(rewritten) = mgr.take_message_rewrite() {
stored = rewritten;
}
}
Err(e) => eprintln!("[plugin] message-end error: {e}"),
}
mgr.store_response(&stored);
let replacement = mgr.take_pending_replace_result();
if let Err(e) = mgr.dispatch("on-complete", "@{}") {
eprintln!("[plugin] on-complete error: {e}");
}
if let Err(e) = mgr.dispatch("prepare-next-run", "@{}") {
eprintln!("[plugin] prepare-next-run error: {e}");
}
ResponseHookResult { replacement }
}
#[cfg(all(test, feature = "plugin"))]
mod plugin_hook_tests {
use super::*;
use crate::plugin::PluginManager;
#[test]
fn resolve_prompt_prepends_on_prompt_hint() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(r#"(defn style-hint [ctx] "ALWAYS USE TYPESCRIPT")"#)
.unwrap();
mgr.register("on-prompt", "style-hint");
let out = resolve_prompt_with_hooks("write a function", &mut mgr);
assert!(out.contains("ALWAYS USE TYPESCRIPT"));
assert!(out.contains("write a function"));
assert!(
out.find("ALWAYS USE TYPESCRIPT").unwrap() < out.find("write a function").unwrap(),
"hint must come before the prompt"
);
}
#[test]
fn resolve_prompt_request_prompt_overrides_hint() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(
r#"(defn override [ctx]
(harness/request-prompt "from-request-prompt")
"from-dispatch")"#,
)
.unwrap();
mgr.register("on-prompt", "override");
let out = resolve_prompt_with_hooks("original", &mut mgr);
assert!(out.contains("from-request-prompt"));
assert!(out.contains("original"));
assert!(!out.contains("from-dispatch"));
}
#[test]
fn resolve_prompt_replace_prompt_substitutes_entirely() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(
r#"(defn replace [ctx]
(harness/replace-prompt "ENTIRELY NEW PROMPT")
nil)"#,
)
.unwrap();
mgr.register("on-prompt", "replace");
let out = resolve_prompt_with_hooks("user typed this", &mut mgr);
assert_eq!(out, "ENTIRELY NEW PROMPT");
assert!(!out.contains("user typed this"));
}
#[test]
fn resolve_prompt_no_hook_passthrough() {
let mut mgr = PluginManager::try_new().unwrap();
let out = resolve_prompt_with_hooks("just this", &mut mgr);
assert_eq!(out, "just this");
}
#[tokio::test]
async fn install_into_populates_every_slot_and_marks_running() {
let (_tx, event_rx) = mpsc::channel(1);
let (interject_tx, _) = mpsc::channel(1);
let (cancel_tx, _) = mpsc::channel(1);
let runner = AgentRunner {
event_rx,
task: tokio::spawn(async {}),
interject_tx,
cancel_tx,
};
let (mut rx, mut abort, mut interject, mut cancel) = (None, None, None, None);
let mut is_running = false;
runner.install_into(
&mut rx,
&mut abort,
&mut interject,
&mut cancel,
&mut is_running,
);
assert!(rx.is_some() && abort.is_some() && interject.is_some() && cancel.is_some());
assert!(is_running);
}
#[test]
fn summarize_actions_dedups_preserving_first_occurrence_order() {
let actions = [
"read".to_string(),
"grep".to_string(),
"read".to_string(),
"bash".to_string(),
"grep".to_string(),
];
assert_eq!(summarize_actions(&actions), "read · grep · bash");
assert_eq!(summarize_actions(&[]), "");
assert_eq!(summarize_actions(&["edit".to_string()]), "edit");
}
#[test]
fn apply_response_hooks_replace_result() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(
r#"(defn wrap [ctx]
(harness/replace-result "WRAPPED")
nil)"#,
)
.unwrap();
mgr.register("on-response", "wrap");
let result = apply_response_hooks("raw response", &mut mgr);
assert_eq!(result.replacement.as_deref(), Some("WRAPPED"));
assert_eq!(mgr.take_pending_next_model(), None);
}
#[test]
fn apply_response_hooks_set_next_model_left_in_manager() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(
r#"(defn pick-model [ctx]
(harness/set-next-model "claude-opus-4-7")
nil)"#,
)
.unwrap();
mgr.register("prepare-next-run", "pick-model");
let _ = apply_response_hooks("ok", &mut mgr);
assert_eq!(
mgr.take_pending_next_model().as_deref(),
Some("claude-opus-4-7")
);
}
#[test]
fn apply_response_hooks_fires_message_end_rewrite() {
let mut mgr = PluginManager::try_new().unwrap();
mgr.eval(
r#"(defn rw [ctx]
(harness/rewrite-message "REWRITTEN-BY-MESSAGE-END")
nil)"#,
)
.unwrap();
mgr.register("message-end", "rw");
apply_response_hooks("original text", &mut mgr);
let stored = mgr.eval("harness-response").unwrap();
assert!(
stored.contains("REWRITTEN-BY-MESSAGE-END"),
"headless message-end rewrite must be stored; got {stored:?}"
);
assert_eq!(mgr.take_message_rewrite(), None);
}
#[test]
fn apply_response_hooks_no_hooks_passthrough() {
let mut mgr = PluginManager::try_new().unwrap();
let result = apply_response_hooks("ok", &mut mgr);
assert_eq!(result, ResponseHookResult::default());
}
}