use std::path::PathBuf;
use std::sync::RwLock;
use determinishtic::Determinishtic;
use sacp::mcp_server::{McpConnectionTo, McpServer};
use sacp::schema::{
AvailableCommand, ContentBlock, ContentChunk, NewSessionRequest, Plan, PlanEntry,
PlanEntryPriority, PlanEntryStatus, PromptRequest, SessionId, SessionNotification,
SessionUpdate, TextContent,
};
use sacp::{Agent, Client, Conductor, ConnectionTo, Proxy, RunWithConnectionTo};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
const REWRITE_SLASH_COMMAND: &str = "retcon:rewrite-git-history";
#[derive(Debug, Deserialize, JsonSchema)]
struct ExecuteParams {
toml_spec: String,
build_command: Option<String>,
test_command: Option<String>,
skip_build: Option<bool>,
skip_test: Option<bool>,
}
#[derive(Debug, Serialize, JsonSchema)]
#[serde(tag = "status")]
enum ExecuteStatus {
Complete,
Stuck {
commit_index: usize,
commit_message: String,
reason: String,
},
Error {
message: String,
},
}
#[derive(Debug, Serialize, JsonSchema)]
struct ExecuteResult {
status: ExecuteStatus,
updated_toml: String,
}
struct AcpHooks {
connection: ConnectionTo<Conductor>,
session_id: SessionId,
commits: RwLock<Vec<String>>,
statuses: RwLock<Vec<PlanEntryStatus>>,
}
impl AcpHooks {
fn new(connection: ConnectionTo<Conductor>, session_id: SessionId) -> Self {
Self {
connection,
session_id,
commits: RwLock::new(Vec::new()),
statuses: RwLock::new(Vec::new()),
}
}
fn session_id_from_acp_url(acp_url: &str) -> SessionId {
let id = acp_url.strip_prefix("acp:").unwrap_or(acp_url);
SessionId::new(id)
}
fn send_plan(&self) {
let commits = self.commits.read().unwrap();
let statuses = self.statuses.read().unwrap();
let entries: Vec<PlanEntry> = commits
.iter()
.zip(statuses.iter())
.map(|(message, status)| {
PlanEntry::new(message.clone(), PlanEntryPriority::Medium, status.clone())
})
.collect();
let plan = Plan::new(entries);
let notification =
SessionNotification::new(self.session_id.clone(), SessionUpdate::Plan(plan));
let _ = self.connection.send_notification_to(Client, notification);
}
fn send_message(&self, text: &str) {
let chunk = ContentChunk::new(ContentBlock::Text(TextContent::new(text)));
let notification = SessionNotification::new(
self.session_id.clone(),
SessionUpdate::AgentMessageChunk(chunk),
);
let _ = self.connection.send_notification_to(Client, notification);
}
}
impl retcon::ExecuteHooks for AcpHooks {
fn report(&self, message: &str) {
self.send_message(message);
}
fn plan_init(&self, commits: &[&str]) {
{
let mut stored = self.commits.write().unwrap();
let mut statuses = self.statuses.write().unwrap();
stored.clear();
statuses.clear();
for commit in commits {
stored.push((*commit).to_string());
statuses.push(PlanEntryStatus::Pending);
}
}
self.send_plan();
}
fn plan_update(&self, commit_idx: usize, status: retcon::CommitStatus) {
{
let mut statuses = self.statuses.write().unwrap();
if commit_idx < statuses.len() {
statuses[commit_idx] = match status {
retcon::CommitStatus::Pending => PlanEntryStatus::Pending,
retcon::CommitStatus::InProgress => PlanEntryStatus::InProgress,
retcon::CommitStatus::Completed => PlanEntryStatus::Completed,
retcon::CommitStatus::Stuck => PlanEntryStatus::InProgress, };
}
}
self.send_plan();
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();
tracing::info!("retcon-proxy starting");
Proxy
.builder()
.on_receive_request_from(
Client,
async |req: NewSessionRequest, responder, connection| {
let cwd = req.cwd.clone();
connection
.build_session_from(req)
.with_mcp_server(make_mcp_server(cwd))?
.on_proxy_session_start(responder, async |_session_id| Ok(()))?;
Ok(())
},
sacp::on_receive_request!(),
)
.on_receive_request_from(
Client,
async |req: PromptRequest, responder, cx| {
if is_rewrite_command(&req) {
let modified = replace_with_canned_prompt(req);
cx.send_request_to(Agent, modified)
.forward_response_to(responder)
} else {
cx.send_request_to(Agent, req)
.forward_response_to(responder)
}
},
sacp::on_receive_request!(),
)
.on_receive_notification_from(
Agent,
async |mut notif: SessionNotification, cx| {
if let SessionUpdate::AvailableCommandsUpdate(ref mut update) = notif.update {
update.available_commands.push(AvailableCommand::new(
REWRITE_SLASH_COMMAND,
"Create a clean git history from messy commits",
));
}
cx.send_notification_to(Client, notif)
},
sacp::on_receive_notification!(),
)
.connect_to(sacp_tokio::Stdio::new())
.await?;
Ok(())
}
fn make_mcp_server(cwd: PathBuf) -> McpServer<Conductor, impl RunWithConnectionTo<Conductor>> {
McpServer::builder("retcon")
.instructions(
"Git history rewriting tools. Use execute-git-rewrite to run a rewrite \
from a TOML specification. The tool will use the agent to extract and \
apply changes, creating clean commits.",
)
.tool_fn(
"execute-git-rewrite",
"Execute a git history rewrite from a TOML specification. \
Returns the updated spec with execution history and a status \
indicating completion or where it got stuck.",
async move |params: ExecuteParams, cx: McpConnectionTo<Conductor>| {
execute_tool(params, &cwd, cx).await
},
sacp::tool_fn!(),
)
.build()
}
fn is_rewrite_command(request: &PromptRequest) -> bool {
let text: String = request
.prompt
.iter()
.filter_map(|block| match block {
ContentBlock::Text(TextContent { text, .. }) => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ");
let text = text.trim();
text == format!("/{}", REWRITE_SLASH_COMMAND) || text == REWRITE_SLASH_COMMAND
}
fn replace_with_canned_prompt(mut request: PromptRequest) -> PromptRequest {
request.prompt = vec![ContentBlock::Text(TextContent::new(retcon::prompt()))];
request
}
async fn execute_tool(
params: ExecuteParams,
cwd: &PathBuf,
cx: McpConnectionTo<Conductor>,
) -> Result<ExecuteResult, sacp::Error> {
let spec = retcon::HistorySpec::from_toml(¶ms.toml_spec)
.map_err(|e| sacp::Error::invalid_params().data(e.to_string()))?;
let git = retcon::Git::discover(cwd)
.map_err(|e| sacp::Error::internal_error().data(e.to_string()))?;
let config = retcon::ExecuteConfig {
build_command: if params.skip_build.unwrap_or(false) {
None
} else {
params
.build_command
.or_else(|| Some("cargo check --all --workspace".to_string()))
},
test_command: if params.skip_test.unwrap_or(false) {
None
} else {
params
.test_command
.or_else(|| Some("cargo test --all --workspace".to_string()))
},
agent: None,
};
let connection = cx.connection_to();
let session_id = AcpHooks::session_id_from_acp_url(&cx.acp_url());
let hooks = AcpHooks::new(connection.clone(), session_id);
let d = Determinishtic::from_connection(connection);
let result = retcon::execute_with_connection(&d, spec, &git, &config, &hooks).await;
let (spec, error) = match result {
Ok(spec) => (spec, None),
Err((spec, e)) => (spec, Some(e)),
};
let updated_toml = spec
.to_toml()
.map_err(|e| sacp::Error::internal_error().data(e.to_string()))?;
let status = if let Some(e) = error {
ExecuteStatus::Error {
message: e.to_string(),
}
} else if spec.commits.iter().all(|c| c.is_complete()) {
ExecuteStatus::Complete
} else if let Some((idx, commit)) = spec.commits.iter().enumerate().find(|(_, c)| c.is_stuck())
{
let reason = commit
.history
.last()
.and_then(|h| match h {
retcon::HistoryEntry::Stuck(r) => Some(r.clone()),
_ => None,
})
.unwrap_or_else(|| "Unknown reason".to_string());
ExecuteStatus::Stuck {
commit_index: idx,
commit_message: commit.message.clone(),
reason,
}
} else {
ExecuteStatus::Complete
};
Ok(ExecuteResult {
status,
updated_toml,
})
}