Skip to main content

lex_api/
handlers.rs

1//! Request routing for the agent API.
2//!
3//! Each handler is a synchronous function that returns
4//! `Result<serde_json::Value, ApiError>`. The dispatcher wraps the result
5//! in an HTTP response — successes as 200 with the JSON body, structured
6//! errors as 4xx/5xx with a JSON envelope.
7
8use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23    pub store: Mutex<Store>,
24    /// Filesystem root of the store. Held alongside the `Store`
25    /// itself so handlers that need to read store-level files
26    /// (e.g. `users.json` for actor auth) don't have to round-
27    /// trip through the lock.
28    pub root: PathBuf,
29    /// In-memory merge sessions, keyed by MergeSessionId. Sessions
30    /// are ephemeral by design (#134 foundation): they live for the
31    /// lifetime of the server process and are GC'd on commit. A
32    /// future slice can persist them to disk so a session survives
33    /// process restarts. For now an agent that gets unlucky with a
34    /// restart re-runs `merge/start` and gets a fresh session.
35    pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38/// Server-side wrapper around [`MergeSession`] carrying the
39/// branch names that started the merge. The lex-vcs session
40/// itself only tracks `OpId` heads; commit needs the dst branch
41/// name to advance the right head, and the src branch name is
42/// kept for round-trip auditability ("which branch did we merge
43/// from?").
44pub struct ApiMergeSession {
45    pub inner: MergeSession,
46    pub src_branch: String,
47    pub dst_branch: String,
48}
49
50impl State {
51    pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52        Ok(Self {
53            store: Mutex::new(Store::open(&root)?),
54            root,
55            sessions: Mutex::new(HashMap::new()),
56        })
57    }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62    error: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68    let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69    Response::from_data(bytes)
70        .with_status_code(status)
71        .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75    json_response(status, &serde_json::to_value(ErrorEnvelope {
76        error: msg.into(), detail: None,
77    }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81    -> Response<std::io::Cursor<Vec<u8>>>
82{
83    json_response(status, &serde_json::to_value(ErrorEnvelope {
84        error: msg.into(), detail: Some(detail),
85    }).unwrap())
86}
87
88pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
89    let method = req.method().clone();
90    let url = req.url().to_string();
91    let path = url.split('?').next().unwrap_or("").to_string();
92    let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
93
94    // `X-Lex-User` is the v3d session identifier — set by humans
95    // operating the web UI through whatever proxy fronts auth, or
96    // by AI agents calling the JSON API. We pluck it once here so
97    // every handler can take it as a borrowed string.
98    let x_lex_user = req.headers().iter()
99        .find(|h| h.field.equiv("x-lex-user"))
100        .map(|h| h.value.as_str().to_string());
101
102    let mut body = String::new();
103    let _ = req.as_reader().read_to_string(&mut body);
104
105    let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
106    req.respond(resp)
107}
108
109fn route(
110    state: &State,
111    method: &Method,
112    path: &str,
113    query: &str,
114    body: &str,
115    x_lex_user: Option<&str>,
116) -> Response<std::io::Cursor<Vec<u8>>> {
117    match (method, path) {
118        // ---- lex-tea v2 (HTML browser) ------------------------
119        (Method::Get, "/") => crate::web::activity_handler(state),
120        (Method::Get, "/web/branches") => crate::web::branches_handler(state),
121        (Method::Get, "/web/trust") => crate::web::trust_handler(state),
122        (Method::Get, "/web/attention") => crate::web::attention_handler(state),
123        (Method::Get, p) if p.starts_with("/web/branch/") => {
124            let name = &p["/web/branch/".len()..];
125            crate::web::branch_handler(state, name)
126        }
127        (Method::Get, p) if p.starts_with("/web/stage/") => {
128            let id = &p["/web/stage/".len()..];
129            crate::web::stage_html_handler(state, id)
130        }
131        // lex-tea v3 human-triage actions (#172). HTML forms post
132        // to /web/stage/<id>/{pin,defer,block,unblock} with a
133        // `reason` body. All four share one handler; the verb in
134        // the path picks the AttestationKind.
135        (Method::Post, p) if p.starts_with("/web/stage/") && (
136            p.ends_with("/pin") || p.ends_with("/defer")
137            || p.ends_with("/block") || p.ends_with("/unblock")
138        ) => {
139            let prefix_len = "/web/stage/".len();
140            let last_slash = p.rfind('/').unwrap_or(p.len());
141            let id = &p[prefix_len..last_slash];
142            let verb = &p[last_slash + 1..];
143            let decision = match verb {
144                "pin"     => crate::web::WebStageDecision::Pin,
145                "defer"   => crate::web::WebStageDecision::Defer,
146                "block"   => crate::web::WebStageDecision::Block,
147                "unblock" => crate::web::WebStageDecision::Unblock,
148                _ => unreachable!("matched in outer guard"),
149            };
150            crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
151        }
152        // ---- JSON API -----------------------------------------
153        (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
154        (Method::Post, "/v1/parse") => parse_handler(body),
155        (Method::Post, "/v1/check") => check_handler(body),
156        (Method::Post, "/v1/publish") => publish_handler(state, body),
157        (Method::Post, "/v1/patch") => patch_handler(state, body),
158        (Method::Get, p) if p.starts_with("/v1/stage/") => {
159            let suffix = &p["/v1/stage/".len()..];
160            // Match `/v1/stage/<id>/attestations` first so a literal
161            // stage_id of "attestations" can't be misrouted.
162            if let Some(id) = suffix.strip_suffix("/attestations") {
163                stage_attestations_handler(state, id)
164            } else {
165                stage_handler(state, suffix)
166            }
167        }
168        (Method::Post, "/v1/run") => run_handler(state, body, false),
169        (Method::Post, "/v1/replay") => run_handler(state, body, true),
170        (Method::Get, p) if p.starts_with("/v1/trace/") => {
171            let id = &p["/v1/trace/".len()..];
172            trace_handler(state, id)
173        }
174        (Method::Get, "/v1/diff") => diff_handler(state, query),
175        (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
176        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
177            let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
178            merge_resolve_handler(state, id, body)
179        }
180        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
181            let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
182            merge_commit_handler(state, id)
183        }
184        // ---- #242: append-only sync of op log + attestation log
185        (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
186        (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
187        // Probe endpoint for `lex op push` to discover the remote's
188        // current head before computing a delta. Returns
189        // `{ "head_op": Option<OpId> }`. `<branch>` is URL-encoded.
190        (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
191            let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
192            branch_head_handler(state, name)
193        }
194        _ => error_response(404, format!("unknown route: {method:?} {path}")),
195    }
196}
197
198#[derive(Deserialize)]
199struct ParseReq { source: String }
200
201fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
202    let req: ParseReq = match serde_json::from_str(body) {
203        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
204    };
205    match load_program_from_str(&req.source) {
206        Ok(prog) => {
207            let stages = canonicalize_program(&prog);
208            json_response(200, &serde_json::to_value(&stages).unwrap())
209        }
210        Err(e) => error_response(400, format!("syntax error: {e}")),
211    }
212}
213
214pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
215    let req: ParseReq = match serde_json::from_str(body) {
216        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
217    };
218    let prog = match load_program_from_str(&req.source) {
219        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
220    };
221    let stages = canonicalize_program(&prog);
222    match lex_types::check_program(&stages) {
223        Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
224        Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
225    }
226}
227
228#[derive(Deserialize)]
229struct PublishReq { source: String, #[serde(default)] activate: bool }
230
231pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
232    let req: PublishReq = match serde_json::from_str(body) {
233        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
234    };
235    let prog = match load_program_from_str(&req.source) {
236        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
237    };
238    // #168: rewrite stdlib parse calls to parse_strict so the
239    // bytecode emitted from these stages enforces required-field
240    // checks at runtime.
241    let mut stages = canonicalize_program(&prog);
242    if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
243        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
244    }
245
246    let store = state.store.lock().unwrap();
247    let branch = store.current_branch();
248
249    // Compute diff between what's already on the branch and the new program.
250    let old_head = match store.branch_head(&branch) {
251        Ok(h) => h,
252        Err(e) => return error_response(500, format!("branch_head: {e}")),
253    };
254    let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
255        .filter_map(|stg| store.get_ast(stg).ok())
256        .filter_map(|s| match s {
257            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
258            _ => None,
259        })
260        .collect();
261    let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
262        .filter_map(|s| match s {
263            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
264            _ => None,
265        })
266        .collect();
267    let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
268
269    // Build new imports map from any Import stages in the source.
270    let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
271    {
272        let entry = new_imports.entry("<source>".into()).or_default();
273        for s in &stages {
274            if let lex_ast::Stage::Import(im) = s {
275                entry.insert(im.reference.clone());
276            }
277        }
278    }
279
280    match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
281        Ok(outcome) => json_response(200, &serde_json::json!({
282            "ops": outcome.ops,
283            "head_op": outcome.head_op,
284        })),
285        // The store-write gate (#130) also type-checks at the top
286        // of `publish_program`. The handler above already pre-checks,
287        // so this branch is reached only on a race or a state we
288        // didn't see at handler time. Surface the structured
289        // envelope (422) instead of a generic 500 — same shape the
290        // initial pre-check uses, so a client only has one error
291        // contract to handle.
292        Err(lex_store::StoreError::TypeError(errs)) => {
293            error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
294        }
295        Err(e) => error_response(500, format!("publish_program: {e}")),
296    }
297}
298
299#[derive(Deserialize)]
300struct PatchReq {
301    stage_id: String,
302    patch: lex_ast::Patch,
303    #[serde(default)] activate: bool,
304}
305
306/// POST /v1/patch — apply a structured edit to a stored stage's
307/// canonical AST, type-check the result, and publish a new stage.
308fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
309    let req: PatchReq = match serde_json::from_str(body) {
310        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
311    };
312    let store = state.store.lock().unwrap();
313
314    // 1. Load.
315    let original = match store.get_ast(&req.stage_id) {
316        Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
317    };
318
319    // 2. Apply.
320    let patched = match lex_ast::apply_patch(&original, &req.patch) {
321        Ok(s) => s,
322        Err(e) => return error_with_detail(422, "patch failed",
323            serde_json::to_value(&e).unwrap_or_default()),
324    };
325
326    // 3. Type-check the new stage in isolation.
327    let stages = vec![patched.clone()];
328    if let Err(errs) = lex_types::check_program(&stages) {
329        return error_with_detail(422, "type errors after patch",
330            serde_json::to_value(&errs).unwrap_or_default());
331    }
332
333    // Routing through apply_operation so /v1/patch participates in
334    // the op DAG. We know this op is always a body change on the
335    // existing sig (a patch can't add a brand-new fn).
336    let branch = store.current_branch();
337
338    // Find the sig — patched stage's sig must match the original's.
339    let sig = match lex_ast::sig_id(&patched) {
340        Some(s) => s,
341        None => return error_response(500, "patched stage has no sig_id"),
342    };
343
344    let new_id = match store.publish(&patched) {
345        Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
346    };
347    if req.activate {
348        if let Err(e) = store.activate(&new_id) {
349            return error_response(500, format!("activate: {e}"));
350        }
351    }
352
353    // Determine op kind: ChangeEffectSig if effects differ, ModifyBody otherwise.
354    let original_effects: std::collections::BTreeSet<String> = match &original {
355        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
356        _ => std::collections::BTreeSet::new(),
357    };
358    let patched_effects: std::collections::BTreeSet<String> = match &patched {
359        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
360        _ => std::collections::BTreeSet::new(),
361    };
362    let head_now = match store.get_branch(&branch) {
363        Ok(b) => b.and_then(|b| b.head_op),
364        Err(e) => return error_response(500, format!("get_branch: {e}")),
365    };
366    let kind = if original_effects != patched_effects {
367        // #247: budget delta is part of the canonical payload now.
368        // Patch endpoints don't currently rehydrate the AST to
369        // recompute budgets, so leave them None — clients that
370        // need budget tracking should publish through the diff
371        // pipeline (`lex publish`) where `compute_diff` populates
372        // them.
373        let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
374        let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
375        lex_vcs::OperationKind::ChangeEffectSig {
376            sig_id: sig.clone(),
377            from_stage_id: req.stage_id.clone(),
378            to_stage_id: new_id.clone(),
379            from_effects: original_effects,
380            to_effects: patched_effects,
381            from_budget,
382            to_budget,
383        }
384    } else {
385        let budget = lex_vcs::operation_budget_from_effects(&original_effects);
386        lex_vcs::OperationKind::ModifyBody {
387            sig_id: sig.clone(),
388            from_stage_id: req.stage_id.clone(),
389            to_stage_id: new_id.clone(),
390            from_budget: budget,
391            to_budget: budget,
392        }
393    };
394    let transition = lex_vcs::StageTransition::Replace {
395        sig_id: sig.clone(),
396        from: req.stage_id.clone(),
397        to: new_id.clone(),
398    };
399    let op = lex_vcs::Operation::new(
400        kind,
401        head_now.into_iter().collect::<Vec<_>>(),
402    );
403    let op_id = match store.apply_operation(&branch, op, transition) {
404        Ok(id) => id,
405        Err(e) => return error_response(500, format!("apply_operation: {e}")),
406    };
407
408    let status = format!("{:?}",
409        store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
410    json_response(200, &serde_json::json!({
411        "old_stage_id": req.stage_id,
412        "new_stage_id": new_id,
413        "sig_id": sig,
414        "status": status,
415        "op_id": op_id,
416    }))
417}
418
419pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
420    let store = state.store.lock().unwrap();
421    let meta = match store.get_metadata(id) {
422        Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
423    };
424    let ast = match store.get_ast(id) {
425        Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
426    };
427    let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
428    json_response(200, &serde_json::json!({
429        "metadata": meta,
430        "ast": ast,
431        "status": status,
432    }))
433}
434
435/// `GET /v1/stage/<id>/attestations` — every persisted attestation
436/// for this stage, newest-first by timestamp. Issue #132's
437/// queryable-evidence consumer surface.
438///
439/// 404s on unknown stage_id (matches `/v1/stage/<id>`'s shape so a
440/// caller round-tripping both endpoints sees consistent errors).
441/// Empty list (200) is *evidence of absence*: the stage exists but
442/// no producer has attested it.
443pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
444    let store = state.store.lock().unwrap();
445    if let Err(e) = store.get_metadata(id) {
446        return error_response(404, format!("{e}"));
447    }
448    let log = match store.attestation_log() {
449        Ok(l) => l,
450        Err(e) => return error_response(500, format!("attestation log: {e}")),
451    };
452    let mut listing = match log.list_for_stage(&id.to_string()) {
453        Ok(v) => v,
454        Err(e) => return error_response(500, format!("list_for_stage: {e}")),
455    };
456    listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
457    json_response(200, &serde_json::json!({"attestations": listing}))
458}
459
460#[derive(Deserialize, Default)]
461struct PolicyJson {
462    #[serde(default)] allow_effects: Vec<String>,
463    #[serde(default)] allow_fs_read: Vec<String>,
464    #[serde(default)] allow_fs_write: Vec<String>,
465    #[serde(default)] budget: Option<u64>,
466}
467
468impl PolicyJson {
469    fn into_policy(self) -> Policy {
470        Policy {
471            allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
472            allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
473            allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
474            allow_net_host: Vec::new(),
475            allow_proc: Vec::new(),
476            budget: self.budget,
477        }
478    }
479}
480
481#[derive(Deserialize)]
482struct RunReq {
483    source: String,
484    #[serde(rename = "fn")] func: String,
485    #[serde(default)] args: Vec<serde_json::Value>,
486    #[serde(default)] policy: PolicyJson,
487    #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
488}
489
490pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
491    let req: RunReq = match serde_json::from_str(body) {
492        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
493    };
494    let prog = match load_program_from_str(&req.source) {
495        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
496    };
497    let stages = canonicalize_program(&prog);
498    if let Err(errs) = lex_types::check_program(&stages) {
499        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
500    }
501    let bc = compile_program(&stages);
502    let policy = req.policy.into_policy();
503    if let Err(violations) = check_policy(&bc, &policy) {
504        return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
505    }
506
507    let mut recorder = lex_trace::Recorder::new();
508    if with_overrides && !req.overrides.is_empty() {
509        recorder = recorder.with_overrides(req.overrides);
510    }
511    let handle = recorder.handle();
512    let handler = DefaultHandler::new(policy);
513    let mut vm = Vm::with_handler(&bc, Box::new(handler));
514    vm.set_tracer(Box::new(recorder));
515
516    let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
517    let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
518    let result = vm.call(&req.func, vargs);
519    let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
520
521    let store = state.store.lock().unwrap();
522    let (root_out, root_err, status) = match &result {
523        Ok(v) => (Some(value_to_json(v)), None, 200u16),
524        Err(e) => (None, Some(format!("{e}")), 200u16),
525    };
526    let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
527        root_out.clone(), root_err.clone(), started, ended);
528    let run_id = match store.save_trace(&tree) {
529        Ok(id) => id,
530        Err(e) => return error_response(500, format!("save_trace: {e}")),
531    };
532
533    let mut body = serde_json::json!({
534        "run_id": run_id,
535        "output": root_out,
536    });
537    if let Some(err) = root_err {
538        body["error"] = serde_json::Value::String(err);
539    }
540    json_response(status, &body)
541}
542
543fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
544    let store = state.store.lock().unwrap();
545    match store.load_trace(id) {
546        Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
547        Err(e) => error_response(404, format!("{e}")),
548    }
549}
550
551fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
552    let mut a = None;
553    let mut b = None;
554    for kv in query.split('&') {
555        if let Some((k, v)) = kv.split_once('=') {
556            match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
557        }
558    }
559    let (Some(a), Some(b)) = (a, b) else {
560        return error_response(400, "missing a or b query params");
561    };
562    let store = state.store.lock().unwrap();
563    let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
564    let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
565    match lex_trace::diff_runs(&ta, &tb) {
566        Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
567        None => json_response(200, &serde_json::json!({"divergence": null})),
568    }
569}
570
571fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
572
573fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
574
575#[derive(Deserialize)]
576struct MergeStartReq {
577    src_branch: String,
578    dst_branch: String,
579}
580
581/// `POST /v1/merge/start` (#134) — open a stateful merge between two
582/// branch heads and return the conflicts the agent needs to
583/// resolve. Auto-resolved sigs (one-sided changes, identical
584/// changes both sides) are returned for audit but don't block
585/// commit.
586///
587/// Response: `{ merge_id, src_head, dst_head, lca, conflicts,
588/// auto_resolved_count }`. The session is held in process memory
589/// keyed by `merge_id` for subsequent `resolve` / `commit` calls
590/// (next slices).
591fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
592    let req: MergeStartReq = match serde_json::from_str(body) {
593        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
594    };
595    let store = state.store.lock().unwrap();
596    let src_head = match store.get_branch(&req.src_branch) {
597        Ok(Some(b)) => b.head_op,
598        Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
599        Err(e) => return error_response(500, format!("src branch read: {e}")),
600    };
601    let dst_head = match store.get_branch(&req.dst_branch) {
602        Ok(Some(b)) => b.head_op,
603        Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
604        Err(e) => return error_response(500, format!("dst branch read: {e}")),
605    };
606    let log = match lex_vcs::OpLog::open(store.root()) {
607        Ok(l) => l,
608        Err(e) => return error_response(500, format!("op log: {e}")),
609    };
610    // Caller doesn't choose merge_ids — minted server-side from
611    // wall clock + a per-process counter avoids leaking session
612    // ids' shape into the public surface.
613    let merge_id = mint_merge_id();
614    let session = match MergeSession::start(
615        merge_id.clone(),
616        &log,
617        src_head.as_ref(),
618        dst_head.as_ref(),
619    ) {
620        Ok(s) => s,
621        Err(e) => return error_response(500, format!("merge start: {e}")),
622    };
623    let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
624    let auto_resolved_count = session.auto_resolved.len();
625    let body = serde_json::json!({
626        "merge_id": merge_id,
627        "src_head": session.src_head,
628        "dst_head": session.dst_head,
629        "lca":      session.lca,
630        "conflicts": conflicts,
631        "auto_resolved_count": auto_resolved_count,
632    });
633    drop(conflicts);
634    drop(store);
635    let wrapped = ApiMergeSession {
636        inner: session,
637        src_branch: req.src_branch,
638        dst_branch: req.dst_branch,
639    };
640    state.sessions.lock().unwrap().insert(merge_id, wrapped);
641    json_response(200, &body)
642}
643
644#[derive(Deserialize)]
645struct MergeResolveReq {
646    /// Each entry is `(conflict_id, resolution)`. The resolution is
647    /// the same shape as `lex_vcs::Resolution`'s tagged JSON form
648    /// — `{"kind":"take_ours"}`, `{"kind":"take_theirs"}`,
649    /// `{"kind":"defer"}`, or `{"kind":"custom","op":{...}}`.
650    resolutions: Vec<MergeResolveEntry>,
651}
652
653#[derive(Deserialize)]
654struct MergeResolveEntry {
655    conflict_id: String,
656    resolution: lex_vcs::Resolution,
657}
658
659/// `POST /v1/merge/<id>/resolve` (#134) — submit batched
660/// resolutions against the conflicts surfaced by `merge/start`.
661/// Returns one verdict per input: accepted (recorded against the
662/// session) or rejected (with structured reason). The session
663/// stays alive across calls so an agent can iterate.
664///
665/// Errors:
666/// - 404 if `merge_id` doesn't refer to a live session (a typo
667///   or a session GC'd by a server restart).
668/// - 400 on malformed body.
669fn merge_resolve_handler(
670    state: &State,
671    merge_id: &str,
672    body: &str,
673) -> Response<std::io::Cursor<Vec<u8>>> {
674    let req: MergeResolveReq = match serde_json::from_str(body) {
675        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
676    };
677    let mut sessions = state.sessions.lock().unwrap();
678    let Some(wrapped) = sessions.get_mut(merge_id) else {
679        return error_response(404, format!("unknown merge_id `{merge_id}`"));
680    };
681    let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
682        .map(|e| (e.conflict_id, e.resolution))
683        .collect();
684    let verdicts = wrapped.inner.resolve(pairs);
685    let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
686    let body = serde_json::json!({
687        "verdicts": verdicts,
688        "remaining_conflicts": remaining,
689    });
690    json_response(200, &body)
691}
692
693/// `POST /v1/merge/<id>/commit` (#134) — finalize a merge
694/// session. Builds a `Merge` op from the auto-resolved sigs +
695/// the conflict resolutions, applies it to the dst branch, and
696/// returns the new head op id. The session is dropped on
697/// success; the caller would re-run `merge/start` to land
698/// further changes.
699///
700/// Errors:
701/// - 404: unknown `merge_id`.
702/// - 422: conflicts remaining (pass `Defer` or just don't
703///   resolve a conflict and you land here). Body carries the
704///   list so the caller knows which still need attention.
705/// - 422: a `Custom` resolution was used. The data layer
706///   supports them but landing them via HTTP needs an extra
707///   pass to apply the custom op against the dst branch
708///   first; deferred to a follow-up slice. Use TakeOurs /
709///   TakeTheirs for now.
710/// - 500: filesystem error while landing the merge op.
711fn merge_commit_handler(
712    state: &State,
713    merge_id: &str,
714) -> Response<std::io::Cursor<Vec<u8>>> {
715    use std::collections::BTreeMap;
716    let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
717        Some(w) => w,
718        None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
719    };
720    let dst_branch = wrapped.dst_branch.clone();
721    let src_head = wrapped.inner.src_head.clone();
722    let dst_head = wrapped.inner.dst_head.clone();
723    let auto_resolved = wrapped.inner.auto_resolved.clone();
724
725    // Translate auto-resolved + resolutions into the StageTransition::Merge
726    // entries map. Only sigs whose head changes relative to dst go in.
727    let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
728
729    // Auto-resolved: only `Src` (one-sided change on src) modifies dst.
730    for outcome in &auto_resolved {
731        if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
732            entries.insert(sig_id.clone(), stage_id.clone());
733        }
734    }
735
736    // Conflict resolutions.
737    let resolved = match wrapped.inner.commit() {
738        Ok(r) => r,
739        Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
740            // Re-insert isn't possible since we removed above; the
741            // caller will need to re-start. That's acceptable: a
742            // commit-with-unresolved-conflicts is operator error.
743            return error_with_detail(
744                422,
745                "conflicts remaining",
746                serde_json::json!({"unresolved": ids}),
747            );
748        }
749    };
750
751    for (conflict_id, resolution) in resolved {
752        match resolution {
753            lex_vcs::Resolution::TakeOurs => {
754                // Dst already has its head. No entry needed.
755            }
756            lex_vcs::Resolution::TakeTheirs => {
757                // Find the conflict's `theirs` stage_id in the
758                // session snapshot. We don't have direct access to
759                // it post-commit (commit consumed the session); but
760                // we can reconstruct from `auto_resolved` plus the
761                // session's pre-commit conflict map. Since we
762                // already moved the inner session, the cleanest fix
763                // for this slice is to rebuild from the on-disk
764                // graph: walk src_head, find the latest stage for
765                // the conflict's sig.
766                match resolve_take_theirs(state, &src_head, &conflict_id) {
767                    Ok(stage_id) => {
768                        entries.insert(conflict_id.clone(), stage_id);
769                    }
770                    Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
771                }
772            }
773            lex_vcs::Resolution::Custom { op } => {
774                // The agent's brand-new op carries the merge target
775                // in its kind (e.g. ModifyBody.to_stage_id). The op
776                // itself isn't separately recorded in the log here
777                // — its head-map effect is folded into the merge
778                // op's entries map. Callers that want the op as a
779                // first-class history entry should publish it via
780                // /v1/publish first and submit a TakeTheirs/TakeOurs
781                // resolution against the resulting head.
782                match op.kind.merge_target() {
783                    Some((sig, stage)) => {
784                        if sig != conflict_id {
785                            return error_with_detail(
786                                422,
787                                "custom op targets a different sig than the conflict",
788                                serde_json::json!({
789                                    "conflict_id": conflict_id,
790                                    "op_targets": sig,
791                                }),
792                            );
793                        }
794                        entries.insert(conflict_id, stage);
795                    }
796                    None => {
797                        return error_with_detail(
798                            422,
799                            "custom op kind doesn't yield a single sig→stage delta",
800                            serde_json::json!({
801                                "conflict_id": conflict_id,
802                                "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
803                            }),
804                        );
805                    }
806                }
807            }
808            lex_vcs::Resolution::Defer => {
809                // Unreachable: commit() rejects Defer above.
810                return error_response(500, "internal: Defer slipped past commit gate");
811            }
812        }
813    }
814
815    let resolved_count = entries.len();
816    let mut parents: Vec<lex_vcs::OpId> = Vec::new();
817    if let Some(d) = dst_head { parents.push(d); }
818    if let Some(s) = src_head { parents.push(s); }
819    let op = lex_vcs::Operation::new(
820        lex_vcs::OperationKind::Merge { resolved: resolved_count },
821        parents,
822    );
823    let transition = lex_vcs::StageTransition::Merge { entries };
824    let store = state.store.lock().unwrap();
825    match store.apply_operation(&dst_branch, op, transition) {
826        Ok(new_head_op) => json_response(200, &serde_json::json!({
827            "new_head_op": new_head_op,
828            "dst_branch": dst_branch,
829        })),
830        Err(e) => error_response(500, format!("apply merge op: {e}")),
831    }
832}
833
834/// Walk the op log from `src_head` backwards to find the latest
835/// stage assigned to `sig`. Used by the commit handler to figure
836/// out what stage `TakeTheirs` should land. `Ok(None)` means src
837/// removed the sig.
838fn resolve_take_theirs(
839    state: &State,
840    src_head: &Option<lex_vcs::OpId>,
841    sig: &lex_vcs::SigId,
842) -> std::io::Result<Option<lex_vcs::StageId>> {
843    let store = state.store.lock().unwrap();
844    let log = lex_vcs::OpLog::open(store.root())?;
845    let Some(head) = src_head.as_ref() else { return Ok(None); };
846    // Walk forward from root → head, replaying each op's transition
847    // for `sig`; the last assignment wins.
848    let mut current: Option<lex_vcs::StageId> = None;
849    for record in log.walk_forward(head, None)? {
850        match &record.produces {
851            lex_vcs::StageTransition::Create { sig_id, stage_id }
852                if sig_id == sig => { current = Some(stage_id.clone()); }
853            lex_vcs::StageTransition::Replace { sig_id, to, .. }
854                if sig_id == sig => { current = Some(to.clone()); }
855            lex_vcs::StageTransition::Remove { sig_id, .. }
856                if sig_id == sig => { current = None; }
857            lex_vcs::StageTransition::Rename { from, to, body_stage_id }
858                if from == sig || to == sig => {
859                if from == sig { current = None; }
860                if to == sig   { current = Some(body_stage_id.clone()); }
861            }
862            lex_vcs::StageTransition::Merge { entries } => {
863                if let Some(opt) = entries.get(sig) {
864                    current = opt.clone();
865                }
866            }
867            _ => {}
868        }
869    }
870    Ok(current)
871}
872
873fn mint_merge_id() -> MergeSessionId {
874    use std::sync::atomic::{AtomicU64, Ordering};
875    static COUNTER: AtomicU64 = AtomicU64::new(0);
876    let nanos = SystemTime::now()
877        .duration_since(UNIX_EPOCH)
878        .map(|d| d.as_nanos())
879        .unwrap_or(0);
880    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
881    format!("merge_{nanos:x}_{n:x}")
882}
883
884// ---- #242: append-only sync ---------------------------------------
885
886/// `POST /v1/ops/batch` (#242). Server endpoint for `lex op push`.
887///
888/// Body: a JSON array of `OperationRecord`s. The handler validates
889/// DAG integrity by checking that every op's `parents` either
890/// already exist on the remote *or* appear earlier in the same
891/// batch. This lets a client send a topologically-ordered slice
892/// without first probing for what's already there.
893///
894/// Response shape:
895///
896/// ```json
897/// { "received": N, "added": M, "skipped": (N-M), "added_ids": [...] }
898/// ```
899///
900/// Failure modes:
901///
902/// * `400` — body isn't a JSON array of op records.
903/// * `422` with `{ "error": "MissingParent", "detail": { "op_id":
904///   ..., "missing_parent": ... } }` if a parent is unreachable.
905///   The whole batch is rejected; nothing is persisted. The client
906///   should backfill the missing op and retry.
907/// * `409` if the supplied `op_id` doesn't match the canonical
908///   hash of the record's payload — content addressing must hold
909///   over the wire.
910///
911/// Idempotency: a record whose `op_id` already exists is silently
912/// skipped (not added, not rejected). Pushing the same payload
913/// twice is `received == N, added == 0` on the second call.
914pub(crate) fn ops_batch_handler(state: &State, body: &str)
915    -> Response<std::io::Cursor<Vec<u8>>>
916{
917    let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
918        Ok(r) => r,
919        Err(e) => return error_response(400,
920            format!("body must be a JSON array of OperationRecord: {e}")),
921    };
922    let store = state.store.lock().unwrap();
923    let log = match lex_vcs::OpLog::open(store.root()) {
924        Ok(l) => l,
925        Err(e) => return error_response(500, format!("opening op log: {e}")),
926    };
927
928    // Validate every record before persisting any of them.
929    //
930    // 1. Content-addressing: the supplied `op_id` must match the
931    //    canonical hash of `record.op`. Otherwise the client is
932    //    sending a forged or corrupted record.
933    // 2. DAG integrity: every parent must either already exist in
934    //    the local log OR appear earlier in this batch.
935    let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
936        std::collections::BTreeSet::new();
937    for rec in &records {
938        let expected = rec.op.op_id();
939        if expected != rec.op_id {
940            return error_with_detail(409, "OpIdMismatch", serde_json::json!({
941                "supplied": rec.op_id,
942                "expected": expected,
943            }));
944        }
945        for parent in &rec.op.parents {
946            let known = match log.get(parent) {
947                Ok(Some(_)) => true,
948                Ok(None) => false,
949                Err(e) => return error_response(500, format!("op log read: {e}")),
950            };
951            if !known && !batch_ids.contains(parent) {
952                return error_with_detail(422, "MissingParent", serde_json::json!({
953                    "op_id": rec.op_id,
954                    "missing_parent": parent,
955                }));
956            }
957        }
958        batch_ids.insert(rec.op_id.clone());
959    }
960
961    // Persist. `OpLog::put` is idempotent so a re-push is a no-op
962    // for already-present records.
963    let mut added = 0usize;
964    let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
965    for rec in &records {
966        let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
967        match log.put(rec) {
968            Ok(()) => {
969                if !already_present {
970                    added += 1;
971                    added_ids.push(&rec.op_id);
972                }
973            }
974            Err(e) => return error_response(500, format!("op log write: {e}")),
975        }
976    }
977
978    json_response(200, &serde_json::json!({
979        "received": records.len(),
980        "added": added,
981        "skipped": records.len() - added,
982        "added_ids": added_ids,
983    }))
984}
985
986/// `POST /v1/attestations/batch` (#242). Server endpoint for `lex
987/// attest push`.
988///
989/// Body: a JSON array of `Attestation`s. Validates that each
990/// attestation's `op_id` (when set) refers to an op that already
991/// exists on the remote — `attestation_id` is then re-derivable
992/// from the canonical form, so cross-store dedup just works.
993///
994/// Response: same shape as `ops_batch_handler` but `added_ids` is
995/// the list of accepted `attestation_id`s.
996///
997/// Failure modes:
998///
999/// * `400` for malformed JSON.
1000/// * `422` with `{ "error": "UnknownOp", "detail": { ... } }` if
1001///   an attestation's `op_id` references an op the remote doesn't
1002///   know about. Whole batch rejected.
1003/// * `409` `AttestationIdMismatch` if the supplied id doesn't
1004///   match the canonical hash.
1005///
1006/// Idempotency: same as the ops endpoint — content-addressed dedup.
1007pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1008    -> Response<std::io::Cursor<Vec<u8>>>
1009{
1010    let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1011        Ok(a) => a,
1012        Err(e) => return error_response(400,
1013            format!("body must be a JSON array of Attestation: {e}")),
1014    };
1015    let store = state.store.lock().unwrap();
1016    let log = match store.attestation_log() {
1017        Ok(l) => l,
1018        Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1019    };
1020    let op_log = match lex_vcs::OpLog::open(store.root()) {
1021        Ok(l) => l,
1022        Err(e) => return error_response(500, format!("opening op log: {e}")),
1023    };
1024
1025    // Validate before persisting any record.
1026    for att in &attestations {
1027        // Content-addressing: re-derive attestation_id from the
1028        // payload and reject mismatches.
1029        let expected = lex_vcs::Attestation::with_timestamp(
1030            att.stage_id.clone(),
1031            att.op_id.clone(),
1032            att.intent_id.clone(),
1033            att.kind.clone(),
1034            att.result.clone(),
1035            att.produced_by.clone(),
1036            att.cost.clone(),
1037            att.timestamp,
1038        ).attestation_id;
1039        if expected != att.attestation_id {
1040            return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1041                "supplied": att.attestation_id,
1042                "expected": expected,
1043            }));
1044        }
1045        // The op_id field, if set, must point at an op the remote
1046        // knows about. Without this check, attestations would
1047        // dangle into a future sync that never lands the op.
1048        if let Some(op_id) = &att.op_id {
1049            match op_log.get(op_id) {
1050                Ok(Some(_)) => {}
1051                Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1052                    "attestation_id": att.attestation_id,
1053                    "op_id": op_id,
1054                })),
1055                Err(e) => return error_response(500, format!("op log read: {e}")),
1056            }
1057        }
1058    }
1059
1060    // Persist. `AttestationLog::put` is idempotent on
1061    // `attestation_id` and the by-stage index is rewritten as a
1062    // marker file, also idempotent.
1063    let mut added = 0usize;
1064    let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1065    for att in &attestations {
1066        let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1067        match log.put(att) {
1068            Ok(()) => {
1069                if !already_present {
1070                    added += 1;
1071                    added_ids.push(&att.attestation_id);
1072                }
1073            }
1074            Err(e) => return error_response(500, format!("attestation log write: {e}")),
1075        }
1076    }
1077
1078    json_response(200, &serde_json::json!({
1079        "received": attestations.len(),
1080        "added": added,
1081        "skipped": attestations.len() - added,
1082        "added_ids": added_ids,
1083    }))
1084}
1085
1086/// `GET /v1/branches/<name>/head` (#242 follow-up). Probe endpoint
1087/// the `lex op push` client uses to discover the remote head before
1088/// computing a delta against `OpLog::ops_since`.
1089///
1090/// Response: `{ "branch": "main", "head_op": Option<OpId> }`.
1091/// Returns 200 even when the branch doesn't exist locally — the
1092/// answer in that case is `head_op: null`, which is the right
1093/// signal for "send everything you have."
1094pub(crate) fn branch_head_handler(state: &State, name: &str)
1095    -> Response<std::io::Cursor<Vec<u8>>>
1096{
1097    let store = state.store.lock().unwrap();
1098    let head = match store.get_branch(name) {
1099        Ok(Some(b)) => b.head_op,
1100        Ok(None) => None,
1101        Err(e) => return error_response(500, format!("get_branch: {e}")),
1102    };
1103    json_response(200, &serde_json::json!({
1104        "branch": name,
1105        "head_op": head,
1106    }))
1107}
1108