Skip to main content

lex_api/
handlers.rs

1//! Request routing for the agent API.
2//!
3//! Each handler is a synchronous function that returns
4//! `Result<serde_json::Value, ApiError>`. The dispatcher wraps the result
5//! in an HTTP response — successes as 200 with the JSON body, structured
6//! errors as 4xx/5xx with a JSON envelope.
7
8use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23    pub store: Mutex<Store>,
24    /// Filesystem root of the store. Held alongside the `Store`
25    /// itself so handlers that need to read store-level files
26    /// (e.g. `users.json` for actor auth) don't have to round-
27    /// trip through the lock.
28    pub root: PathBuf,
29    /// In-memory merge sessions, keyed by MergeSessionId. Sessions
30    /// are ephemeral by design (#134 foundation): they live for the
31    /// lifetime of the server process and are GC'd on commit. A
32    /// future slice can persist them to disk so a session survives
33    /// process restarts. For now an agent that gets unlucky with a
34    /// restart re-runs `merge/start` and gets a fresh session.
35    pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38/// Server-side wrapper around [`MergeSession`] carrying the
39/// branch names that started the merge. The lex-vcs session
40/// itself only tracks `OpId` heads; commit needs the dst branch
41/// name to advance the right head, and the src branch name is
42/// kept for round-trip auditability ("which branch did we merge
43/// from?").
44pub struct ApiMergeSession {
45    pub inner: MergeSession,
46    pub src_branch: String,
47    pub dst_branch: String,
48}
49
50impl State {
51    pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52        Ok(Self {
53            store: Mutex::new(Store::open(&root)?),
54            root,
55            sessions: Mutex::new(HashMap::new()),
56        })
57    }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62    error: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68    let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69    Response::from_data(bytes)
70        .with_status_code(status)
71        .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75    json_response(status, &serde_json::to_value(ErrorEnvelope {
76        error: msg.into(), detail: None,
77    }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81    -> Response<std::io::Cursor<Vec<u8>>>
82{
83    json_response(status, &serde_json::to_value(ErrorEnvelope {
84        error: msg.into(), detail: Some(detail),
85    }).unwrap())
86}
87
88pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
89    let method = req.method().clone();
90    let url = req.url().to_string();
91    let path = url.split('?').next().unwrap_or("").to_string();
92    let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
93
94    // `X-Lex-User` is the v3d session identifier — set by humans
95    // operating the web UI through whatever proxy fronts auth, or
96    // by AI agents calling the JSON API. We pluck it once here so
97    // every handler can take it as a borrowed string.
98    let x_lex_user = req.headers().iter()
99        .find(|h| h.field.equiv("x-lex-user"))
100        .map(|h| h.value.as_str().to_string());
101
102    let mut body = String::new();
103    let _ = req.as_reader().read_to_string(&mut body);
104
105    let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
106    req.respond(resp)
107}
108
109fn route(
110    state: &State,
111    method: &Method,
112    path: &str,
113    query: &str,
114    body: &str,
115    x_lex_user: Option<&str>,
116) -> Response<std::io::Cursor<Vec<u8>>> {
117    match (method, path) {
118        // ---- lex-tea v2 (HTML browser) ------------------------
119        (Method::Get, "/") => crate::web::activity_handler(state),
120        (Method::Get, "/web/branches") => crate::web::branches_handler(state),
121        (Method::Get, "/web/trust") => crate::web::trust_handler(state),
122        (Method::Get, "/web/attention") => crate::web::attention_handler(state),
123        (Method::Get, p) if p.starts_with("/web/branch/") => {
124            let name = &p["/web/branch/".len()..];
125            crate::web::branch_handler(state, name)
126        }
127        (Method::Get, p) if p.starts_with("/web/stage/") => {
128            let id = &p["/web/stage/".len()..];
129            crate::web::stage_html_handler(state, id)
130        }
131        // lex-tea v3 human-triage actions (#172). HTML forms post
132        // to /web/stage/<id>/{pin,defer,block,unblock} with a
133        // `reason` body. All four share one handler; the verb in
134        // the path picks the AttestationKind.
135        (Method::Post, p) if p.starts_with("/web/stage/") && (
136            p.ends_with("/pin") || p.ends_with("/defer")
137            || p.ends_with("/block") || p.ends_with("/unblock")
138        ) => {
139            let prefix_len = "/web/stage/".len();
140            let last_slash = p.rfind('/').unwrap_or(p.len());
141            let id = &p[prefix_len..last_slash];
142            let verb = &p[last_slash + 1..];
143            let decision = match verb {
144                "pin"     => crate::web::WebStageDecision::Pin,
145                "defer"   => crate::web::WebStageDecision::Defer,
146                "block"   => crate::web::WebStageDecision::Block,
147                "unblock" => crate::web::WebStageDecision::Unblock,
148                _ => unreachable!("matched in outer guard"),
149            };
150            crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
151        }
152        // ---- JSON API -----------------------------------------
153        (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
154        (Method::Post, "/v1/parse") => parse_handler(body),
155        (Method::Post, "/v1/check") => check_handler(body),
156        (Method::Post, "/v1/publish") => publish_handler(state, body),
157        (Method::Post, "/v1/patch") => patch_handler(state, body),
158        (Method::Get, p) if p.starts_with("/v1/stage/") => {
159            let suffix = &p["/v1/stage/".len()..];
160            // Match `/v1/stage/<id>/attestations` first so a literal
161            // stage_id of "attestations" can't be misrouted.
162            if let Some(id) = suffix.strip_suffix("/attestations") {
163                stage_attestations_handler(state, id)
164            } else {
165                stage_handler(state, suffix)
166            }
167        }
168        (Method::Post, "/v1/run") => run_handler(state, body, false),
169        (Method::Post, "/v1/replay") => run_handler(state, body, true),
170        (Method::Get, p) if p.starts_with("/v1/trace/") => {
171            let id = &p["/v1/trace/".len()..];
172            trace_handler(state, id)
173        }
174        (Method::Get, "/v1/diff") => diff_handler(state, query),
175        (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
176        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
177            let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
178            merge_resolve_handler(state, id, body)
179        }
180        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
181            let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
182            merge_commit_handler(state, id)
183        }
184        _ => error_response(404, format!("unknown route: {method:?} {path}")),
185    }
186}
187
188#[derive(Deserialize)]
189struct ParseReq { source: String }
190
191fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
192    let req: ParseReq = match serde_json::from_str(body) {
193        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
194    };
195    match load_program_from_str(&req.source) {
196        Ok(prog) => {
197            let stages = canonicalize_program(&prog);
198            json_response(200, &serde_json::to_value(&stages).unwrap())
199        }
200        Err(e) => error_response(400, format!("syntax error: {e}")),
201    }
202}
203
204pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
205    let req: ParseReq = match serde_json::from_str(body) {
206        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
207    };
208    let prog = match load_program_from_str(&req.source) {
209        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
210    };
211    let stages = canonicalize_program(&prog);
212    match lex_types::check_program(&stages) {
213        Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
214        Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
215    }
216}
217
218#[derive(Deserialize)]
219struct PublishReq { source: String, #[serde(default)] activate: bool }
220
221pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
222    let req: PublishReq = match serde_json::from_str(body) {
223        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
224    };
225    let prog = match load_program_from_str(&req.source) {
226        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
227    };
228    // #168: rewrite stdlib parse calls to parse_strict so the
229    // bytecode emitted from these stages enforces required-field
230    // checks at runtime.
231    let mut stages = canonicalize_program(&prog);
232    if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
233        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
234    }
235
236    let store = state.store.lock().unwrap();
237    let branch = store.current_branch();
238
239    // Compute diff between what's already on the branch and the new program.
240    let old_head = match store.branch_head(&branch) {
241        Ok(h) => h,
242        Err(e) => return error_response(500, format!("branch_head: {e}")),
243    };
244    let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
245        .filter_map(|stg| store.get_ast(stg).ok())
246        .filter_map(|s| match s {
247            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
248            _ => None,
249        })
250        .collect();
251    let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
252        .filter_map(|s| match s {
253            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
254            _ => None,
255        })
256        .collect();
257    let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
258
259    // Build new imports map from any Import stages in the source.
260    let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
261    {
262        let entry = new_imports.entry("<source>".into()).or_default();
263        for s in &stages {
264            if let lex_ast::Stage::Import(im) = s {
265                entry.insert(im.reference.clone());
266            }
267        }
268    }
269
270    match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
271        Ok(outcome) => json_response(200, &serde_json::json!({
272            "ops": outcome.ops,
273            "head_op": outcome.head_op,
274        })),
275        // The store-write gate (#130) also type-checks at the top
276        // of `publish_program`. The handler above already pre-checks,
277        // so this branch is reached only on a race or a state we
278        // didn't see at handler time. Surface the structured
279        // envelope (422) instead of a generic 500 — same shape the
280        // initial pre-check uses, so a client only has one error
281        // contract to handle.
282        Err(lex_store::StoreError::TypeError(errs)) => {
283            error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
284        }
285        Err(e) => error_response(500, format!("publish_program: {e}")),
286    }
287}
288
289#[derive(Deserialize)]
290struct PatchReq {
291    stage_id: String,
292    patch: lex_ast::Patch,
293    #[serde(default)] activate: bool,
294}
295
296/// POST /v1/patch — apply a structured edit to a stored stage's
297/// canonical AST, type-check the result, and publish a new stage.
298fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
299    let req: PatchReq = match serde_json::from_str(body) {
300        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
301    };
302    let store = state.store.lock().unwrap();
303
304    // 1. Load.
305    let original = match store.get_ast(&req.stage_id) {
306        Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
307    };
308
309    // 2. Apply.
310    let patched = match lex_ast::apply_patch(&original, &req.patch) {
311        Ok(s) => s,
312        Err(e) => return error_with_detail(422, "patch failed",
313            serde_json::to_value(&e).unwrap_or_default()),
314    };
315
316    // 3. Type-check the new stage in isolation.
317    let stages = vec![patched.clone()];
318    if let Err(errs) = lex_types::check_program(&stages) {
319        return error_with_detail(422, "type errors after patch",
320            serde_json::to_value(&errs).unwrap_or_default());
321    }
322
323    // Routing through apply_operation so /v1/patch participates in
324    // the op DAG. We know this op is always a body change on the
325    // existing sig (a patch can't add a brand-new fn).
326    let branch = store.current_branch();
327
328    // Find the sig — patched stage's sig must match the original's.
329    let sig = match lex_ast::sig_id(&patched) {
330        Some(s) => s,
331        None => return error_response(500, "patched stage has no sig_id"),
332    };
333
334    let new_id = match store.publish(&patched) {
335        Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
336    };
337    if req.activate {
338        if let Err(e) = store.activate(&new_id) {
339            return error_response(500, format!("activate: {e}"));
340        }
341    }
342
343    // Determine op kind: ChangeEffectSig if effects differ, ModifyBody otherwise.
344    let original_effects: std::collections::BTreeSet<String> = match &original {
345        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
346        _ => std::collections::BTreeSet::new(),
347    };
348    let patched_effects: std::collections::BTreeSet<String> = match &patched {
349        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
350        _ => std::collections::BTreeSet::new(),
351    };
352    let head_now = match store.get_branch(&branch) {
353        Ok(b) => b.and_then(|b| b.head_op),
354        Err(e) => return error_response(500, format!("get_branch: {e}")),
355    };
356    let kind = if original_effects != patched_effects {
357        lex_vcs::OperationKind::ChangeEffectSig {
358            sig_id: sig.clone(),
359            from_stage_id: req.stage_id.clone(),
360            to_stage_id: new_id.clone(),
361            from_effects: original_effects,
362            to_effects: patched_effects,
363        }
364    } else {
365        lex_vcs::OperationKind::ModifyBody {
366            sig_id: sig.clone(),
367            from_stage_id: req.stage_id.clone(),
368            to_stage_id: new_id.clone(),
369        }
370    };
371    let transition = lex_vcs::StageTransition::Replace {
372        sig_id: sig.clone(),
373        from: req.stage_id.clone(),
374        to: new_id.clone(),
375    };
376    let op = lex_vcs::Operation::new(
377        kind,
378        head_now.into_iter().collect::<Vec<_>>(),
379    );
380    let op_id = match store.apply_operation(&branch, op, transition) {
381        Ok(id) => id,
382        Err(e) => return error_response(500, format!("apply_operation: {e}")),
383    };
384
385    let status = format!("{:?}",
386        store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
387    json_response(200, &serde_json::json!({
388        "old_stage_id": req.stage_id,
389        "new_stage_id": new_id,
390        "sig_id": sig,
391        "status": status,
392        "op_id": op_id,
393    }))
394}
395
396pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
397    let store = state.store.lock().unwrap();
398    let meta = match store.get_metadata(id) {
399        Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
400    };
401    let ast = match store.get_ast(id) {
402        Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
403    };
404    let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
405    json_response(200, &serde_json::json!({
406        "metadata": meta,
407        "ast": ast,
408        "status": status,
409    }))
410}
411
412/// `GET /v1/stage/<id>/attestations` — every persisted attestation
413/// for this stage, newest-first by timestamp. Issue #132's
414/// queryable-evidence consumer surface.
415///
416/// 404s on unknown stage_id (matches `/v1/stage/<id>`'s shape so a
417/// caller round-tripping both endpoints sees consistent errors).
418/// Empty list (200) is *evidence of absence*: the stage exists but
419/// no producer has attested it.
420pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
421    let store = state.store.lock().unwrap();
422    if let Err(e) = store.get_metadata(id) {
423        return error_response(404, format!("{e}"));
424    }
425    let log = match store.attestation_log() {
426        Ok(l) => l,
427        Err(e) => return error_response(500, format!("attestation log: {e}")),
428    };
429    let mut listing = match log.list_for_stage(&id.to_string()) {
430        Ok(v) => v,
431        Err(e) => return error_response(500, format!("list_for_stage: {e}")),
432    };
433    listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
434    json_response(200, &serde_json::json!({"attestations": listing}))
435}
436
437#[derive(Deserialize, Default)]
438struct PolicyJson {
439    #[serde(default)] allow_effects: Vec<String>,
440    #[serde(default)] allow_fs_read: Vec<String>,
441    #[serde(default)] allow_fs_write: Vec<String>,
442    #[serde(default)] budget: Option<u64>,
443}
444
445impl PolicyJson {
446    fn into_policy(self) -> Policy {
447        Policy {
448            allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
449            allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
450            allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
451            allow_net_host: Vec::new(),
452            allow_proc: Vec::new(),
453            budget: self.budget,
454        }
455    }
456}
457
458#[derive(Deserialize)]
459struct RunReq {
460    source: String,
461    #[serde(rename = "fn")] func: String,
462    #[serde(default)] args: Vec<serde_json::Value>,
463    #[serde(default)] policy: PolicyJson,
464    #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
465}
466
467pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
468    let req: RunReq = match serde_json::from_str(body) {
469        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
470    };
471    let prog = match load_program_from_str(&req.source) {
472        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
473    };
474    let stages = canonicalize_program(&prog);
475    if let Err(errs) = lex_types::check_program(&stages) {
476        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
477    }
478    let bc = compile_program(&stages);
479    let policy = req.policy.into_policy();
480    if let Err(violations) = check_policy(&bc, &policy) {
481        return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
482    }
483
484    let mut recorder = lex_trace::Recorder::new();
485    if with_overrides && !req.overrides.is_empty() {
486        recorder = recorder.with_overrides(req.overrides);
487    }
488    let handle = recorder.handle();
489    let handler = DefaultHandler::new(policy);
490    let mut vm = Vm::with_handler(&bc, Box::new(handler));
491    vm.set_tracer(Box::new(recorder));
492
493    let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
494    let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
495    let result = vm.call(&req.func, vargs);
496    let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
497
498    let store = state.store.lock().unwrap();
499    let (root_out, root_err, status) = match &result {
500        Ok(v) => (Some(value_to_json(v)), None, 200u16),
501        Err(e) => (None, Some(format!("{e}")), 200u16),
502    };
503    let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
504        root_out.clone(), root_err.clone(), started, ended);
505    let run_id = match store.save_trace(&tree) {
506        Ok(id) => id,
507        Err(e) => return error_response(500, format!("save_trace: {e}")),
508    };
509
510    let mut body = serde_json::json!({
511        "run_id": run_id,
512        "output": root_out,
513    });
514    if let Some(err) = root_err {
515        body["error"] = serde_json::Value::String(err);
516    }
517    json_response(status, &body)
518}
519
520fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
521    let store = state.store.lock().unwrap();
522    match store.load_trace(id) {
523        Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
524        Err(e) => error_response(404, format!("{e}")),
525    }
526}
527
528fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
529    let mut a = None;
530    let mut b = None;
531    for kv in query.split('&') {
532        if let Some((k, v)) = kv.split_once('=') {
533            match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
534        }
535    }
536    let (Some(a), Some(b)) = (a, b) else {
537        return error_response(400, "missing a or b query params");
538    };
539    let store = state.store.lock().unwrap();
540    let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
541    let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
542    match lex_trace::diff_runs(&ta, &tb) {
543        Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
544        None => json_response(200, &serde_json::json!({"divergence": null})),
545    }
546}
547
548fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
549
550fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
551
552#[derive(Deserialize)]
553struct MergeStartReq {
554    src_branch: String,
555    dst_branch: String,
556}
557
558/// `POST /v1/merge/start` (#134) — open a stateful merge between two
559/// branch heads and return the conflicts the agent needs to
560/// resolve. Auto-resolved sigs (one-sided changes, identical
561/// changes both sides) are returned for audit but don't block
562/// commit.
563///
564/// Response: `{ merge_id, src_head, dst_head, lca, conflicts,
565/// auto_resolved_count }`. The session is held in process memory
566/// keyed by `merge_id` for subsequent `resolve` / `commit` calls
567/// (next slices).
568fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
569    let req: MergeStartReq = match serde_json::from_str(body) {
570        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
571    };
572    let store = state.store.lock().unwrap();
573    let src_head = match store.get_branch(&req.src_branch) {
574        Ok(Some(b)) => b.head_op,
575        Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
576        Err(e) => return error_response(500, format!("src branch read: {e}")),
577    };
578    let dst_head = match store.get_branch(&req.dst_branch) {
579        Ok(Some(b)) => b.head_op,
580        Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
581        Err(e) => return error_response(500, format!("dst branch read: {e}")),
582    };
583    let log = match lex_vcs::OpLog::open(store.root()) {
584        Ok(l) => l,
585        Err(e) => return error_response(500, format!("op log: {e}")),
586    };
587    // Caller doesn't choose merge_ids — minted server-side from
588    // wall clock + a per-process counter avoids leaking session
589    // ids' shape into the public surface.
590    let merge_id = mint_merge_id();
591    let session = match MergeSession::start(
592        merge_id.clone(),
593        &log,
594        src_head.as_ref(),
595        dst_head.as_ref(),
596    ) {
597        Ok(s) => s,
598        Err(e) => return error_response(500, format!("merge start: {e}")),
599    };
600    let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
601    let auto_resolved_count = session.auto_resolved.len();
602    let body = serde_json::json!({
603        "merge_id": merge_id,
604        "src_head": session.src_head,
605        "dst_head": session.dst_head,
606        "lca":      session.lca,
607        "conflicts": conflicts,
608        "auto_resolved_count": auto_resolved_count,
609    });
610    drop(conflicts);
611    drop(store);
612    let wrapped = ApiMergeSession {
613        inner: session,
614        src_branch: req.src_branch,
615        dst_branch: req.dst_branch,
616    };
617    state.sessions.lock().unwrap().insert(merge_id, wrapped);
618    json_response(200, &body)
619}
620
621#[derive(Deserialize)]
622struct MergeResolveReq {
623    /// Each entry is `(conflict_id, resolution)`. The resolution is
624    /// the same shape as `lex_vcs::Resolution`'s tagged JSON form
625    /// — `{"kind":"take_ours"}`, `{"kind":"take_theirs"}`,
626    /// `{"kind":"defer"}`, or `{"kind":"custom","op":{...}}`.
627    resolutions: Vec<MergeResolveEntry>,
628}
629
630#[derive(Deserialize)]
631struct MergeResolveEntry {
632    conflict_id: String,
633    resolution: lex_vcs::Resolution,
634}
635
636/// `POST /v1/merge/<id>/resolve` (#134) — submit batched
637/// resolutions against the conflicts surfaced by `merge/start`.
638/// Returns one verdict per input: accepted (recorded against the
639/// session) or rejected (with structured reason). The session
640/// stays alive across calls so an agent can iterate.
641///
642/// Errors:
643/// - 404 if `merge_id` doesn't refer to a live session (a typo
644///   or a session GC'd by a server restart).
645/// - 400 on malformed body.
646fn merge_resolve_handler(
647    state: &State,
648    merge_id: &str,
649    body: &str,
650) -> Response<std::io::Cursor<Vec<u8>>> {
651    let req: MergeResolveReq = match serde_json::from_str(body) {
652        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
653    };
654    let mut sessions = state.sessions.lock().unwrap();
655    let Some(wrapped) = sessions.get_mut(merge_id) else {
656        return error_response(404, format!("unknown merge_id `{merge_id}`"));
657    };
658    let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
659        .map(|e| (e.conflict_id, e.resolution))
660        .collect();
661    let verdicts = wrapped.inner.resolve(pairs);
662    let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
663    let body = serde_json::json!({
664        "verdicts": verdicts,
665        "remaining_conflicts": remaining,
666    });
667    json_response(200, &body)
668}
669
670/// `POST /v1/merge/<id>/commit` (#134) — finalize a merge
671/// session. Builds a `Merge` op from the auto-resolved sigs +
672/// the conflict resolutions, applies it to the dst branch, and
673/// returns the new head op id. The session is dropped on
674/// success; the caller would re-run `merge/start` to land
675/// further changes.
676///
677/// Errors:
678/// - 404: unknown `merge_id`.
679/// - 422: conflicts remaining (pass `Defer` or just don't
680///   resolve a conflict and you land here). Body carries the
681///   list so the caller knows which still need attention.
682/// - 422: a `Custom` resolution was used. The data layer
683///   supports them but landing them via HTTP needs an extra
684///   pass to apply the custom op against the dst branch
685///   first; deferred to a follow-up slice. Use TakeOurs /
686///   TakeTheirs for now.
687/// - 500: filesystem error while landing the merge op.
688fn merge_commit_handler(
689    state: &State,
690    merge_id: &str,
691) -> Response<std::io::Cursor<Vec<u8>>> {
692    use std::collections::BTreeMap;
693    let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
694        Some(w) => w,
695        None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
696    };
697    let dst_branch = wrapped.dst_branch.clone();
698    let src_head = wrapped.inner.src_head.clone();
699    let dst_head = wrapped.inner.dst_head.clone();
700    let auto_resolved = wrapped.inner.auto_resolved.clone();
701
702    // Translate auto-resolved + resolutions into the StageTransition::Merge
703    // entries map. Only sigs whose head changes relative to dst go in.
704    let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
705
706    // Auto-resolved: only `Src` (one-sided change on src) modifies dst.
707    for outcome in &auto_resolved {
708        if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
709            entries.insert(sig_id.clone(), stage_id.clone());
710        }
711    }
712
713    // Conflict resolutions.
714    let resolved = match wrapped.inner.commit() {
715        Ok(r) => r,
716        Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
717            // Re-insert isn't possible since we removed above; the
718            // caller will need to re-start. That's acceptable: a
719            // commit-with-unresolved-conflicts is operator error.
720            return error_with_detail(
721                422,
722                "conflicts remaining",
723                serde_json::json!({"unresolved": ids}),
724            );
725        }
726    };
727
728    for (conflict_id, resolution) in resolved {
729        match resolution {
730            lex_vcs::Resolution::TakeOurs => {
731                // Dst already has its head. No entry needed.
732            }
733            lex_vcs::Resolution::TakeTheirs => {
734                // Find the conflict's `theirs` stage_id in the
735                // session snapshot. We don't have direct access to
736                // it post-commit (commit consumed the session); but
737                // we can reconstruct from `auto_resolved` plus the
738                // session's pre-commit conflict map. Since we
739                // already moved the inner session, the cleanest fix
740                // for this slice is to rebuild from the on-disk
741                // graph: walk src_head, find the latest stage for
742                // the conflict's sig.
743                match resolve_take_theirs(state, &src_head, &conflict_id) {
744                    Ok(stage_id) => {
745                        entries.insert(conflict_id.clone(), stage_id);
746                    }
747                    Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
748                }
749            }
750            lex_vcs::Resolution::Custom { op } => {
751                // The agent's brand-new op carries the merge target
752                // in its kind (e.g. ModifyBody.to_stage_id). The op
753                // itself isn't separately recorded in the log here
754                // — its head-map effect is folded into the merge
755                // op's entries map. Callers that want the op as a
756                // first-class history entry should publish it via
757                // /v1/publish first and submit a TakeTheirs/TakeOurs
758                // resolution against the resulting head.
759                match op.kind.merge_target() {
760                    Some((sig, stage)) => {
761                        if sig != conflict_id {
762                            return error_with_detail(
763                                422,
764                                "custom op targets a different sig than the conflict",
765                                serde_json::json!({
766                                    "conflict_id": conflict_id,
767                                    "op_targets": sig,
768                                }),
769                            );
770                        }
771                        entries.insert(conflict_id, stage);
772                    }
773                    None => {
774                        return error_with_detail(
775                            422,
776                            "custom op kind doesn't yield a single sig→stage delta",
777                            serde_json::json!({
778                                "conflict_id": conflict_id,
779                                "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
780                            }),
781                        );
782                    }
783                }
784            }
785            lex_vcs::Resolution::Defer => {
786                // Unreachable: commit() rejects Defer above.
787                return error_response(500, "internal: Defer slipped past commit gate");
788            }
789        }
790    }
791
792    let resolved_count = entries.len();
793    let mut parents: Vec<lex_vcs::OpId> = Vec::new();
794    if let Some(d) = dst_head { parents.push(d); }
795    if let Some(s) = src_head { parents.push(s); }
796    let op = lex_vcs::Operation::new(
797        lex_vcs::OperationKind::Merge { resolved: resolved_count },
798        parents,
799    );
800    let transition = lex_vcs::StageTransition::Merge { entries };
801    let store = state.store.lock().unwrap();
802    match store.apply_operation(&dst_branch, op, transition) {
803        Ok(new_head_op) => json_response(200, &serde_json::json!({
804            "new_head_op": new_head_op,
805            "dst_branch": dst_branch,
806        })),
807        Err(e) => error_response(500, format!("apply merge op: {e}")),
808    }
809}
810
811/// Walk the op log from `src_head` backwards to find the latest
812/// stage assigned to `sig`. Used by the commit handler to figure
813/// out what stage `TakeTheirs` should land. `Ok(None)` means src
814/// removed the sig.
815fn resolve_take_theirs(
816    state: &State,
817    src_head: &Option<lex_vcs::OpId>,
818    sig: &lex_vcs::SigId,
819) -> std::io::Result<Option<lex_vcs::StageId>> {
820    let store = state.store.lock().unwrap();
821    let log = lex_vcs::OpLog::open(store.root())?;
822    let Some(head) = src_head.as_ref() else { return Ok(None); };
823    // Walk forward from root → head, replaying each op's transition
824    // for `sig`; the last assignment wins.
825    let mut current: Option<lex_vcs::StageId> = None;
826    for record in log.walk_forward(head, None)? {
827        match &record.produces {
828            lex_vcs::StageTransition::Create { sig_id, stage_id }
829                if sig_id == sig => { current = Some(stage_id.clone()); }
830            lex_vcs::StageTransition::Replace { sig_id, to, .. }
831                if sig_id == sig => { current = Some(to.clone()); }
832            lex_vcs::StageTransition::Remove { sig_id, .. }
833                if sig_id == sig => { current = None; }
834            lex_vcs::StageTransition::Rename { from, to, body_stage_id }
835                if from == sig || to == sig => {
836                if from == sig { current = None; }
837                if to == sig   { current = Some(body_stage_id.clone()); }
838            }
839            lex_vcs::StageTransition::Merge { entries } => {
840                if let Some(opt) = entries.get(sig) {
841                    current = opt.clone();
842                }
843            }
844            _ => {}
845        }
846    }
847    Ok(current)
848}
849
850fn mint_merge_id() -> MergeSessionId {
851    use std::sync::atomic::{AtomicU64, Ordering};
852    static COUNTER: AtomicU64 = AtomicU64::new(0);
853    let nanos = SystemTime::now()
854        .duration_since(UNIX_EPOCH)
855        .map(|d| d.as_nanos())
856        .unwrap_or(0);
857    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
858    format!("merge_{nanos:x}_{n:x}")
859}
860