use anyhow::{Context, Result};
use fs2::FileExt;
use std::fs::OpenOptions;
use std::path::Path;
use crate::{agent, config::Config, diff, frontmatter, git, merge, snapshot};
pub fn run(
file: &Path,
branch: bool,
agent_name: Option<&str>,
model: Option<&str>,
dry_run: bool,
no_git: bool,
config: &Config,
) -> Result<()> {
if !file.exists() {
anyhow::bail!("file not found: {}", file.display());
}
eprintln!("[run] starting for {}", file.display());
let the_diff = match diff::compute(file)? {
Some(d) => {
eprintln!("[run] diff computed ({} bytes)", d.len());
d
}
None => {
eprintln!("[run] Nothing changed since last run for {}", file.display());
return Ok(());
}
};
let raw_content = std::fs::read_to_string(file)?;
let (content_original, _session_id) = frontmatter::ensure_session(&raw_content)?;
if content_original != raw_content {
std::fs::write(file, &content_original)?;
}
let (fm, _body) = frontmatter::parse(&content_original)?;
let agent_name = agent_name
.or(fm.agent.as_deref())
.or(config.default_agent.as_deref())
.unwrap_or("claude");
let agent_config = config.agents.get(agent_name);
let expanded_env = if fm.env.is_empty() {
Vec::new()
} else {
match crate::env::expand_values(&fm.env) {
Ok(e) => e,
Err(e) => {
eprintln!("[run] env expansion failed: {} — continuing without env", e);
Vec::new()
}
}
};
let backend = agent::resolve(agent_name, agent_config, expanded_env)?;
let prompt = if fm.resume.is_some() {
format!(
"The user edited the session document. Here is the diff since the last run:\n\n\
<diff>\n{}\n</diff>\n\n\
The full document is now:\n\n\
<document>\n{}\n</document>\n\n\
Respond to the user's new content. Write your response in markdown.\n\
Do not include a ## Assistant heading — it will be added automatically.\n\
If the user asked questions inline (e.g., in blockquotes), address those too.",
the_diff, content_original
)
} else {
format!(
"The user is starting a session document. Here is the full document:\n\n\
<document>\n{}\n</document>\n\n\
Respond to the user's content. Write your response in markdown.\n\
Do not include a ## Assistant heading — it will be added automatically.\n\
If the user asked questions inline (e.g., in blockquotes), address those too.",
content_original
)
};
if dry_run {
eprintln!("--- Diff ---");
print!("{}", the_diff);
eprintln!("--- Prompt would be {} bytes ---", prompt.len());
return Ok(());
}
if branch && !no_git {
git::create_branch(file)?;
}
if !no_git {
git::commit(file)?;
}
eprintln!("Submitting to {}...", agent_name);
let fork = fm.resume.is_none();
let model = model.or(fm.model.as_deref());
let response = backend.send(&prompt, fm.resume.as_deref(), fork, model)?;
let mut content_ours = content_original.clone();
if let Some(ref sid) = response.session_id {
content_ours = frontmatter::set_resume_id(&content_ours, sid)?;
}
let response_text = crate::write::strip_assistant_heading(&response.text);
content_ours.push_str("\n## Assistant\n\n");
content_ours.push_str(&response_text);
content_ours.push_str("\n\n## User\n\n");
let doc_lock = acquire_doc_lock(file)?;
let content_current = std::fs::read_to_string(file)?;
let final_content = if content_current == content_original {
content_ours.clone()
} else {
eprintln!("File was modified during run. Merging changes...");
merge::merge_contents(&content_original, &content_ours, &content_current)?
};
let ipc_ok = crate::write::try_ipc_full_content(file, &final_content)?;
if !ipc_ok {
atomic_write(file, &final_content)?;
snapshot::save(file, &content_ours)?;
}
drop(doc_lock);
eprintln!("Response appended to {}", file.display());
Ok(())
}
fn acquire_doc_lock(path: &Path) -> Result<std::fs::File> {
let lock_path = crate::snapshot::lock_path_for(path)?;
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_path)
.with_context(|| format!("failed to open doc lock {}", lock_path.display()))?;
file.lock_exclusive()
.with_context(|| format!("failed to acquire doc lock on {}", lock_path.display()))?;
Ok(file)
}
fn atomic_write(path: &Path, content: &str) -> Result<()> {
use std::io::Write;
let parent = path.parent().unwrap_or(Path::new("."));
let mut tmp = tempfile::NamedTempFile::new_in(parent)
.with_context(|| format!("failed to create temp file in {}", parent.display()))?;
tmp.write_all(content.as_bytes())
.with_context(|| "failed to write temp file")?;
tmp.persist(path)
.with_context(|| format!("failed to rename temp file to {}", path.display()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use tempfile::TempDir;
#[test]
fn acquire_doc_lock_succeeds() {
let dir = TempDir::new().unwrap();
let doc = dir.path().join("test.md");
std::fs::write(&doc, "content").unwrap();
let lock = acquire_doc_lock(&doc);
assert!(lock.is_ok());
}
#[test]
fn doc_lock_released_on_drop() {
let dir = TempDir::new().unwrap();
let doc = dir.path().join("test.md");
std::fs::write(&doc, "content").unwrap();
{
let _lock = acquire_doc_lock(&doc).unwrap();
}
let lock2 = acquire_doc_lock(&doc);
assert!(lock2.is_ok());
}
#[test]
fn atomic_write_correct_content() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("atomic.md");
atomic_write(&path, "hello world").unwrap();
assert_eq!(std::fs::read_to_string(&path).unwrap(), "hello world");
}
#[test]
fn atomic_write_overwrites_existing() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("overwrite.md");
std::fs::write(&path, "old content").unwrap();
atomic_write(&path, "new content").unwrap();
assert_eq!(std::fs::read_to_string(&path).unwrap(), "new content");
}
#[test]
fn concurrent_atomic_writes_no_corruption() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("concurrent.md");
std::fs::write(&path, "initial").unwrap();
let n = 20;
let barrier = Arc::new(Barrier::new(n));
let mut handles = Vec::new();
for i in 0..n {
let p = path.clone();
let bar = Arc::clone(&barrier);
let content = format!("writer-{}-content", i);
handles.push(std::thread::spawn(move || {
bar.wait();
atomic_write(&p, &content).unwrap();
}));
}
for h in handles {
h.join().unwrap();
}
let final_content = std::fs::read_to_string(&path).unwrap();
assert!(final_content.starts_with("writer-"));
assert!(final_content.ends_with("-content"));
}
#[test]
fn parallel_different_files_no_interference() {
let dir = TempDir::new().unwrap();
let doc_a = dir.path().join("a.md");
let doc_b = dir.path().join("b.md");
std::fs::write(&doc_a, "initial-a").unwrap();
std::fs::write(&doc_b, "initial-b").unwrap();
let barrier = Arc::new(Barrier::new(2));
let bar_a = Arc::clone(&barrier);
let path_a = doc_a.clone();
let ha = std::thread::spawn(move || {
let _lock = acquire_doc_lock(&path_a).unwrap();
bar_a.wait(); let content = std::fs::read_to_string(&path_a).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
atomic_write(&path_a, &format!("{}\n## Assistant\nResponse A", content)).unwrap();
});
let bar_b = Arc::clone(&barrier);
let path_b = doc_b.clone();
let hb = std::thread::spawn(move || {
let _lock = acquire_doc_lock(&path_b).unwrap();
bar_b.wait(); let content = std::fs::read_to_string(&path_b).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
atomic_write(&path_b, &format!("{}\n## Assistant\nResponse B", content)).unwrap();
});
ha.join().unwrap();
hb.join().unwrap();
let a = std::fs::read_to_string(&doc_a).unwrap();
let b = std::fs::read_to_string(&doc_b).unwrap();
assert!(a.contains("Response A"), "Doc A missing response: {}", a);
assert!(b.contains("Response B"), "Doc B missing response: {}", b);
assert!(!a.contains("Response B"), "Doc A has B's response");
assert!(!b.contains("Response A"), "Doc B has A's response");
}
#[test]
fn same_file_serialized_by_flock() {
let dir = TempDir::new().unwrap();
let doc = dir.path().join("shared.md");
std::fs::write(&doc, "# Shared Doc\n").unwrap();
let barrier = Arc::new(Barrier::new(2));
let mut handles = Vec::new();
for i in 0..2 {
let path = doc.clone();
let bar = Arc::clone(&barrier);
handles.push(std::thread::spawn(move || {
bar.wait(); let lock = acquire_doc_lock(&path).unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let updated = format!("{}writer-{}\n", content, i);
std::thread::sleep(std::time::Duration::from_millis(5));
atomic_write(&path, &updated).unwrap();
drop(lock);
}));
}
for h in handles {
h.join().unwrap();
}
let final_content = std::fs::read_to_string(&doc).unwrap();
assert!(final_content.contains("writer-0") && final_content.contains("writer-1"),
"Both writes should land (flock serializes): {}", final_content);
}
#[test]
fn flock_prevents_partial_read_during_write() {
let dir = TempDir::new().unwrap();
let doc = dir.path().join("partial.md");
std::fs::write(&doc, "before").unwrap();
let path_w = doc.clone();
let path_r = doc.clone();
let writer = std::thread::spawn(move || {
let lock = acquire_doc_lock(&path_w).unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
atomic_write(&path_w, "after").unwrap();
drop(lock);
});
std::thread::sleep(std::time::Duration::from_millis(5)); let reader = std::thread::spawn(move || {
let _lock = acquire_doc_lock(&path_r).unwrap();
std::fs::read_to_string(&path_r).unwrap()
});
writer.join().unwrap();
let read_content = reader.join().unwrap();
assert_eq!(read_content, "after", "Reader should see completed write, not partial state");
}
#[test]
fn merge_clean_no_conflicts() {
let dir = TempDir::new().unwrap();
let base_path = dir.path().join("base");
let ours_path = dir.path().join("ours");
let theirs_path = dir.path().join("theirs");
let base = "line 1\nline 2\nline 3\n";
let ours = "line 1\nline 2\nline 3\n\n## Assistant\n\nResponse here.\n";
let theirs = "line 1\nline 2\nline 3\n";
std::fs::write(&base_path, base).unwrap();
std::fs::write(&ours_path, ours).unwrap();
std::fs::write(&theirs_path, theirs).unwrap();
let output = std::process::Command::new("git")
.current_dir(dir.path())
.args([
"merge-file", "-p", "--diff3",
"-L", "agent-response",
"-L", "original",
"-L", "your-edits",
])
.arg(&ours_path)
.arg(&base_path)
.arg(&theirs_path)
.output()
.unwrap();
let merged = String::from_utf8(output.stdout).unwrap();
assert!(output.status.success(), "merge should be clean");
assert!(merged.contains("Response here."));
}
}