Skip to main content

lex_api/
handlers.rs

1//! Request routing for the agent API.
2//!
3//! Each handler is a synchronous function that returns
4//! `Result<serde_json::Value, ApiError>`. The dispatcher wraps the result
5//! in an HTTP response — successes as 200 with the JSON body, structured
6//! errors as 4xx/5xx with a JSON envelope.
7
8use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23    pub store: Mutex<Store>,
24    /// Filesystem root of the store. Held alongside the `Store`
25    /// itself so handlers that need to read store-level files
26    /// (e.g. `users.json` for actor auth) don't have to round-
27    /// trip through the lock.
28    pub root: PathBuf,
29    /// In-memory merge sessions, keyed by MergeSessionId. Sessions
30    /// are ephemeral by design (#134 foundation): they live for the
31    /// lifetime of the server process and are GC'd on commit. A
32    /// future slice can persist them to disk so a session survives
33    /// process restarts. For now an agent that gets unlucky with a
34    /// restart re-runs `merge/start` and gets a fresh session.
35    pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38/// Server-side wrapper around [`MergeSession`] carrying the
39/// branch names that started the merge. The lex-vcs session
40/// itself only tracks `OpId` heads; commit needs the dst branch
41/// name to advance the right head, and the src branch name is
42/// kept for round-trip auditability ("which branch did we merge
43/// from?").
44pub struct ApiMergeSession {
45    pub inner: MergeSession,
46    pub src_branch: String,
47    pub dst_branch: String,
48}
49
50impl State {
51    pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52        Ok(Self {
53            store: Mutex::new(Store::open(&root)?),
54            root,
55            sessions: Mutex::new(HashMap::new()),
56        })
57    }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62    error: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68    let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69    Response::from_data(bytes)
70        .with_status_code(status)
71        .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75    json_response(status, &serde_json::to_value(ErrorEnvelope {
76        error: msg.into(), detail: None,
77    }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81    -> Response<std::io::Cursor<Vec<u8>>>
82{
83    json_response(status, &serde_json::to_value(ErrorEnvelope {
84        error: msg.into(), detail: Some(detail),
85    }).unwrap())
86}
87
88/// Map a `StoreError` from a write path (`apply_operation` /
89/// `apply_operation_checked`) to an HTTP response. The only special
90/// case today is `Contention` (#262 multi-writer CAS retries
91/// exhausted), which maps to 503 with a `Retry-After` header so
92/// clients back off rather than hammering the same branch tip.
93fn write_error_response(prefix: &str, err: lex_store::StoreError)
94    -> Response<std::io::Cursor<Vec<u8>>>
95{
96    if let lex_store::StoreError::Contention { branch, attempts } = &err {
97        let body = serde_json::to_vec(&ErrorEnvelope {
98            error: format!("{prefix}: branch '{branch}' is contended (attempts={attempts})"),
99            detail: Some(serde_json::json!({
100                "kind": "contention",
101                "branch": branch,
102                "attempts": attempts,
103            })),
104        }).unwrap_or_else(|_| b"{}".to_vec());
105        return Response::from_data(body)
106            .with_status_code(503)
107            .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
108            .with_header(Header::from_bytes(&b"Retry-After"[..], &b"1"[..]).unwrap());
109    }
110    // #292 slice 3: budget overflow → 503 with `Retry-After: 0`.
111    // Unlike Contention (where a retry might land after another
112    // writer finishes), there's no point retrying a budget-
113    // exceeded op — the caller needs to raise the cap, switch
114    // sessions, or refactor the work. The `Retry-After: 0`
115    // signals "don't bother retrying as-is" while still using
116    // the canonical "service refused" status code.
117    if let lex_store::StoreError::BudgetExceeded { session_id, cap, spent_after } = &err {
118        let body = serde_json::to_vec(&ErrorEnvelope {
119            error: format!(
120                "{prefix}: session `{session_id}` budget exceeded \
121                 (spent_after={spent_after}, cap={cap})"
122            ),
123            detail: Some(serde_json::json!({
124                "kind": "budget_exceeded",
125                "session_id": session_id,
126                "cap": cap,
127                "spent_after": spent_after,
128            })),
129        }).unwrap_or_else(|_| b"{}".to_vec());
130        return Response::from_data(body)
131            .with_status_code(503)
132            .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
133            .with_header(Header::from_bytes(&b"Retry-After"[..], &b"0"[..]).unwrap());
134    }
135    error_response(500, format!("{prefix}: {err}"))
136}
137
138pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
139    let method = req.method().clone();
140    let url = req.url().to_string();
141    let path = url.split('?').next().unwrap_or("").to_string();
142    let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
143
144    // `X-Lex-User` is the v3d session identifier — set by humans
145    // operating the web UI through whatever proxy fronts auth, or
146    // by AI agents calling the JSON API. We pluck it once here so
147    // every handler can take it as a borrowed string.
148    let x_lex_user = req.headers().iter()
149        .find(|h| h.field.equiv("x-lex-user"))
150        .map(|h| h.value.as_str().to_string());
151
152    let mut body = String::new();
153    let _ = req.as_reader().read_to_string(&mut body);
154
155    let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
156    req.respond(resp)
157}
158
159fn route(
160    state: &State,
161    method: &Method,
162    path: &str,
163    query: &str,
164    body: &str,
165    x_lex_user: Option<&str>,
166) -> Response<std::io::Cursor<Vec<u8>>> {
167    match (method, path) {
168        // ---- lex-tea v2 (HTML browser) ------------------------
169        (Method::Get, "/") => crate::web::activity_handler(state),
170        (Method::Get, "/web/branches") => crate::web::branches_handler(state),
171        (Method::Get, "/web/trust") => crate::web::trust_handler(state),
172        (Method::Get, "/web/attention") => crate::web::attention_handler(state),
173        (Method::Get, p) if p.starts_with("/web/branch/") => {
174            let name = &p["/web/branch/".len()..];
175            crate::web::branch_handler(state, name)
176        }
177        (Method::Get, p) if p.starts_with("/web/stage/") => {
178            let id = &p["/web/stage/".len()..];
179            crate::web::stage_html_handler(state, id)
180        }
181        // lex-tea v3 human-triage actions (#172). HTML forms post
182        // to /web/stage/<id>/{pin,defer,block,unblock} with a
183        // `reason` body. All four share one handler; the verb in
184        // the path picks the AttestationKind.
185        (Method::Post, p) if p.starts_with("/web/stage/") && (
186            p.ends_with("/pin") || p.ends_with("/defer")
187            || p.ends_with("/block") || p.ends_with("/unblock")
188        ) => {
189            let prefix_len = "/web/stage/".len();
190            let last_slash = p.rfind('/').unwrap_or(p.len());
191            let id = &p[prefix_len..last_slash];
192            let verb = &p[last_slash + 1..];
193            let decision = match verb {
194                "pin"     => crate::web::WebStageDecision::Pin,
195                "defer"   => crate::web::WebStageDecision::Defer,
196                "block"   => crate::web::WebStageDecision::Block,
197                "unblock" => crate::web::WebStageDecision::Unblock,
198                _ => unreachable!("matched in outer guard"),
199            };
200            crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
201        }
202        // ---- JSON API -----------------------------------------
203        (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
204        (Method::Post, "/v1/parse") => parse_handler(body),
205        (Method::Post, "/v1/check") => check_handler(body),
206        (Method::Post, "/v1/publish") => publish_handler(state, body),
207        (Method::Post, "/v1/patch") => patch_handler(state, body),
208        (Method::Get, p) if p.starts_with("/v1/stage/") => {
209            let suffix = &p["/v1/stage/".len()..];
210            // Match `/v1/stage/<id>/attestations` first so a literal
211            // stage_id of "attestations" can't be misrouted.
212            if let Some(id) = suffix.strip_suffix("/attestations") {
213                stage_attestations_handler(state, id)
214            } else {
215                stage_handler(state, suffix)
216            }
217        }
218        (Method::Post, "/v1/run") => run_handler(state, body, false),
219        (Method::Post, "/v1/replay") => run_handler(state, body, true),
220        (Method::Get, p) if p.starts_with("/v1/trace/") => {
221            let id = &p["/v1/trace/".len()..];
222            trace_handler(state, id)
223        }
224        (Method::Get, "/v1/diff") => diff_handler(state, query),
225        (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
226        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
227            let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
228            merge_resolve_handler(state, id, body)
229        }
230        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
231            let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
232            merge_commit_handler(state, id)
233        }
234        // ---- #242: append-only sync of op log + attestation log
235        (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
236        (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
237        // Probe endpoint for `lex op push` to discover the remote's
238        // current head before computing a delta. Returns
239        // `{ "head_op": Option<OpId> }`. `<branch>` is URL-encoded.
240        (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
241            let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
242            branch_head_handler(state, name)
243        }
244        // ---- #260: append-only fetch (inverse of #242 push)
245        // Body is a JSON array of OperationRecords reachable from
246        // `branch.head_op` but not from `after`, oldest-first.
247        (Method::Get, "/v1/ops/since") => ops_since_handler(state, query),
248        (Method::Get, "/v1/attestations/since") => attestations_since_handler(state, query),
249        _ => error_response(404, format!("unknown route: {method:?} {path}")),
250    }
251}
252
253#[derive(Deserialize)]
254struct ParseReq { source: String }
255
256fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
257    let req: ParseReq = match serde_json::from_str(body) {
258        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
259    };
260    match load_program_from_str(&req.source) {
261        Ok(prog) => {
262            let stages = canonicalize_program(&prog);
263            json_response(200, &serde_json::to_value(&stages).unwrap())
264        }
265        Err(e) => error_response(400, format!("syntax error: {e}")),
266    }
267}
268
269pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
270    let req: ParseReq = match serde_json::from_str(body) {
271        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
272    };
273    let prog = match load_program_from_str(&req.source) {
274        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
275    };
276    let stages = canonicalize_program(&prog);
277    match lex_types::check_program(&stages) {
278        Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
279        Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
280    }
281}
282
283#[derive(Deserialize)]
284struct PublishReq { source: String, #[serde(default)] activate: bool }
285
286pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
287    let req: PublishReq = match serde_json::from_str(body) {
288        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
289    };
290    let prog = match load_program_from_str(&req.source) {
291        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
292    };
293    // #168: rewrite stdlib parse calls to parse_strict so the
294    // bytecode emitted from these stages enforces required-field
295    // checks at runtime.
296    let mut stages = canonicalize_program(&prog);
297    if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
298        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
299    }
300
301    let store = state.store.lock().unwrap();
302    let branch = store.current_branch();
303
304    // Compute diff between what's already on the branch and the new program.
305    let old_head = match store.branch_head(&branch) {
306        Ok(h) => h,
307        Err(e) => return error_response(500, format!("branch_head: {e}")),
308    };
309    let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
310        .filter_map(|stg| store.get_ast(stg).ok())
311        .filter_map(|s| match s {
312            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
313            _ => None,
314        })
315        .collect();
316    let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
317        .filter_map(|s| match s {
318            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
319            _ => None,
320        })
321        .collect();
322    let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
323
324    // Build new imports map from any Import stages in the source.
325    let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
326    {
327        let entry = new_imports.entry("<source>".into()).or_default();
328        for s in &stages {
329            if let lex_ast::Stage::Import(im) = s {
330                entry.insert(im.reference.clone());
331            }
332        }
333    }
334
335    match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
336        Ok(outcome) => json_response(200, &serde_json::json!({
337            "ops": outcome.ops,
338            "head_op": outcome.head_op,
339        })),
340        // The store-write gate (#130) also type-checks at the top
341        // of `publish_program`. The handler above already pre-checks,
342        // so this branch is reached only on a race or a state we
343        // didn't see at handler time. Surface the structured
344        // envelope (422) instead of a generic 500 — same shape the
345        // initial pre-check uses, so a client only has one error
346        // contract to handle.
347        Err(lex_store::StoreError::TypeError(errs)) => {
348            error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
349        }
350        Err(e) => write_error_response("publish_program", e),
351    }
352}
353
354#[derive(Deserialize)]
355struct PatchReq {
356    stage_id: String,
357    patch: lex_ast::Patch,
358    #[serde(default)] activate: bool,
359}
360
361/// POST /v1/patch — apply a structured edit to a stored stage's
362/// canonical AST, type-check the result, and publish a new stage.
363fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
364    let req: PatchReq = match serde_json::from_str(body) {
365        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
366    };
367    let store = state.store.lock().unwrap();
368
369    // 1. Load.
370    let original = match store.get_ast(&req.stage_id) {
371        Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
372    };
373
374    // 2. Apply.
375    let patched = match lex_ast::apply_patch(&original, &req.patch) {
376        Ok(s) => s,
377        Err(e) => return error_with_detail(422, "patch failed",
378            serde_json::to_value(&e).unwrap_or_default()),
379    };
380
381    // 3. Type-check the new stage in isolation.
382    let stages = vec![patched.clone()];
383    if let Err(errs) = lex_types::check_program(&stages) {
384        return error_with_detail(422, "type errors after patch",
385            serde_json::to_value(&errs).unwrap_or_default());
386    }
387
388    // Routing through apply_operation so /v1/patch participates in
389    // the op DAG. We know this op is always a body change on the
390    // existing sig (a patch can't add a brand-new fn).
391    let branch = store.current_branch();
392
393    // Find the sig — patched stage's sig must match the original's.
394    let sig = match lex_ast::sig_id(&patched) {
395        Some(s) => s,
396        None => return error_response(500, "patched stage has no sig_id"),
397    };
398
399    let new_id = match store.publish(&patched) {
400        Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
401    };
402    if req.activate {
403        if let Err(e) = store.activate(&new_id) {
404            return error_response(500, format!("activate: {e}"));
405        }
406    }
407
408    // Determine op kind: ChangeEffectSig if effects differ, ModifyBody otherwise.
409    let original_effects: std::collections::BTreeSet<String> = match &original {
410        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
411        _ => std::collections::BTreeSet::new(),
412    };
413    let patched_effects: std::collections::BTreeSet<String> = match &patched {
414        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
415        _ => std::collections::BTreeSet::new(),
416    };
417    let head_now = match store.get_branch(&branch) {
418        Ok(b) => b.and_then(|b| b.head_op),
419        Err(e) => return error_response(500, format!("get_branch: {e}")),
420    };
421    let kind = if original_effects != patched_effects {
422        // #247: budget delta is part of the canonical payload now.
423        // Patch endpoints don't currently rehydrate the AST to
424        // recompute budgets, so leave them None — clients that
425        // need budget tracking should publish through the diff
426        // pipeline (`lex publish`) where `compute_diff` populates
427        // them.
428        let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
429        let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
430        lex_vcs::OperationKind::ChangeEffectSig {
431            sig_id: sig.clone(),
432            from_stage_id: req.stage_id.clone(),
433            to_stage_id: new_id.clone(),
434            from_effects: original_effects,
435            to_effects: patched_effects,
436            from_budget,
437            to_budget,
438        }
439    } else {
440        let budget = lex_vcs::operation_budget_from_effects(&original_effects);
441        lex_vcs::OperationKind::ModifyBody {
442            sig_id: sig.clone(),
443            from_stage_id: req.stage_id.clone(),
444            to_stage_id: new_id.clone(),
445            from_budget: budget,
446            to_budget: budget,
447        }
448    };
449    let transition = lex_vcs::StageTransition::Replace {
450        sig_id: sig.clone(),
451        from: req.stage_id.clone(),
452        to: new_id.clone(),
453    };
454    let op = lex_vcs::Operation::new(
455        kind,
456        head_now.into_iter().collect::<Vec<_>>(),
457    );
458    let op_id = match store.apply_operation(&branch, op, transition) {
459        Ok(id) => id,
460        Err(e) => return write_error_response("apply_operation", e),
461    };
462
463    let status = format!("{:?}",
464        store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
465    json_response(200, &serde_json::json!({
466        "old_stage_id": req.stage_id,
467        "new_stage_id": new_id,
468        "sig_id": sig,
469        "status": status,
470        "op_id": op_id,
471    }))
472}
473
474pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
475    let store = state.store.lock().unwrap();
476    let meta = match store.get_metadata(id) {
477        Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
478    };
479    let ast = match store.get_ast(id) {
480        Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
481    };
482    let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
483    json_response(200, &serde_json::json!({
484        "metadata": meta,
485        "ast": ast,
486        "status": status,
487    }))
488}
489
490/// `GET /v1/stage/<id>/attestations` — every persisted attestation
491/// for this stage, newest-first by timestamp. Issue #132's
492/// queryable-evidence consumer surface.
493///
494/// 404s on unknown stage_id (matches `/v1/stage/<id>`'s shape so a
495/// caller round-tripping both endpoints sees consistent errors).
496/// Empty list (200) is *evidence of absence*: the stage exists but
497/// no producer has attested it.
498pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
499    let store = state.store.lock().unwrap();
500    if let Err(e) = store.get_metadata(id) {
501        return error_response(404, format!("{e}"));
502    }
503    let log = match store.attestation_log() {
504        Ok(l) => l,
505        Err(e) => return error_response(500, format!("attestation log: {e}")),
506    };
507    let mut listing = match log.list_for_stage(&id.to_string()) {
508        Ok(v) => v,
509        Err(e) => return error_response(500, format!("list_for_stage: {e}")),
510    };
511    listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
512    json_response(200, &serde_json::json!({"attestations": listing}))
513}
514
515#[derive(Deserialize, Default)]
516struct PolicyJson {
517    #[serde(default)] allow_effects: Vec<String>,
518    #[serde(default)] allow_fs_read: Vec<String>,
519    #[serde(default)] allow_fs_write: Vec<String>,
520    #[serde(default)] budget: Option<u64>,
521}
522
523impl PolicyJson {
524    fn into_policy(self) -> Policy {
525        Policy {
526            allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
527            allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
528            allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
529            allow_net_host: Vec::new(),
530            allow_proc: Vec::new(),
531            budget: self.budget,
532        }
533    }
534}
535
536#[derive(Deserialize)]
537struct RunReq {
538    source: String,
539    #[serde(rename = "fn")] func: String,
540    #[serde(default)] args: Vec<serde_json::Value>,
541    #[serde(default)] policy: PolicyJson,
542    #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
543}
544
545pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
546    let req: RunReq = match serde_json::from_str(body) {
547        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
548    };
549    let prog = match load_program_from_str(&req.source) {
550        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
551    };
552    let stages = canonicalize_program(&prog);
553    if let Err(errs) = lex_types::check_program(&stages) {
554        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
555    }
556    let bc = compile_program(&stages);
557    let policy = req.policy.into_policy();
558    if let Err(violations) = check_policy(&bc, &policy) {
559        return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
560    }
561
562    let mut recorder = lex_trace::Recorder::new();
563    if with_overrides && !req.overrides.is_empty() {
564        recorder = recorder.with_overrides(req.overrides);
565    }
566    let handle = recorder.handle();
567    let handler = DefaultHandler::new(policy);
568    let mut vm = Vm::with_handler(&bc, Box::new(handler));
569    vm.set_tracer(Box::new(recorder));
570
571    let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
572    let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
573    let result = vm.call(&req.func, vargs);
574    let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
575
576    let store = state.store.lock().unwrap();
577    let (root_out, root_err, status) = match &result {
578        Ok(v) => (Some(value_to_json(v)), None, 200u16),
579        Err(e) => (None, Some(format!("{e}")), 200u16),
580    };
581    let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
582        root_out.clone(), root_err.clone(), started, ended);
583    let run_id = match store.save_trace(&tree) {
584        Ok(id) => id,
585        Err(e) => return error_response(500, format!("save_trace: {e}")),
586    };
587
588    let mut body = serde_json::json!({
589        "run_id": run_id,
590        "output": root_out,
591    });
592    if let Some(err) = root_err {
593        body["error"] = serde_json::Value::String(err);
594    }
595    json_response(status, &body)
596}
597
598fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
599    let store = state.store.lock().unwrap();
600    match store.load_trace(id) {
601        Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
602        Err(e) => error_response(404, format!("{e}")),
603    }
604}
605
606fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
607    let mut a = None;
608    let mut b = None;
609    for kv in query.split('&') {
610        if let Some((k, v)) = kv.split_once('=') {
611            match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
612        }
613    }
614    let (Some(a), Some(b)) = (a, b) else {
615        return error_response(400, "missing a or b query params");
616    };
617    let store = state.store.lock().unwrap();
618    let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
619    let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
620    match lex_trace::diff_runs(&ta, &tb) {
621        Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
622        None => json_response(200, &serde_json::json!({"divergence": null})),
623    }
624}
625
626fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
627
628fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
629
630#[derive(Deserialize)]
631struct MergeStartReq {
632    src_branch: String,
633    dst_branch: String,
634}
635
636/// `POST /v1/merge/start` (#134) — open a stateful merge between two
637/// branch heads and return the conflicts the agent needs to
638/// resolve. Auto-resolved sigs (one-sided changes, identical
639/// changes both sides) are returned for audit but don't block
640/// commit.
641///
642/// Response: `{ merge_id, src_head, dst_head, lca, conflicts,
643/// auto_resolved_count }`. The session is held in process memory
644/// keyed by `merge_id` for subsequent `resolve` / `commit` calls
645/// (next slices).
646fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
647    let req: MergeStartReq = match serde_json::from_str(body) {
648        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
649    };
650    let store = state.store.lock().unwrap();
651    let src_head = match store.get_branch(&req.src_branch) {
652        Ok(Some(b)) => b.head_op,
653        Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
654        Err(e) => return error_response(500, format!("src branch read: {e}")),
655    };
656    let dst_head = match store.get_branch(&req.dst_branch) {
657        Ok(Some(b)) => b.head_op,
658        Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
659        Err(e) => return error_response(500, format!("dst branch read: {e}")),
660    };
661    let log = match lex_vcs::OpLog::open(store.root()) {
662        Ok(l) => l,
663        Err(e) => return error_response(500, format!("op log: {e}")),
664    };
665    // Caller doesn't choose merge_ids — minted server-side from
666    // wall clock + a per-process counter avoids leaking session
667    // ids' shape into the public surface.
668    let merge_id = mint_merge_id();
669    let session = match MergeSession::start(
670        merge_id.clone(),
671        &log,
672        src_head.as_ref(),
673        dst_head.as_ref(),
674    ) {
675        Ok(s) => s,
676        Err(e) => return error_response(500, format!("merge start: {e}")),
677    };
678    let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
679    let auto_resolved_count = session.auto_resolved.len();
680    let body = serde_json::json!({
681        "merge_id": merge_id,
682        "src_head": session.src_head,
683        "dst_head": session.dst_head,
684        "lca":      session.lca,
685        "conflicts": conflicts,
686        "auto_resolved_count": auto_resolved_count,
687    });
688    drop(conflicts);
689    drop(store);
690    let wrapped = ApiMergeSession {
691        inner: session,
692        src_branch: req.src_branch,
693        dst_branch: req.dst_branch,
694    };
695    state.sessions.lock().unwrap().insert(merge_id, wrapped);
696    json_response(200, &body)
697}
698
699#[derive(Deserialize)]
700struct MergeResolveReq {
701    /// Each entry is `(conflict_id, resolution)`. The resolution is
702    /// the same shape as `lex_vcs::Resolution`'s tagged JSON form
703    /// — `{"kind":"take_ours"}`, `{"kind":"take_theirs"}`,
704    /// `{"kind":"defer"}`, or `{"kind":"custom","op":{...}}`.
705    resolutions: Vec<MergeResolveEntry>,
706}
707
708#[derive(Deserialize)]
709struct MergeResolveEntry {
710    conflict_id: String,
711    resolution: lex_vcs::Resolution,
712}
713
714/// `POST /v1/merge/<id>/resolve` (#134) — submit batched
715/// resolutions against the conflicts surfaced by `merge/start`.
716/// Returns one verdict per input: accepted (recorded against the
717/// session) or rejected (with structured reason). The session
718/// stays alive across calls so an agent can iterate.
719///
720/// Errors:
721/// - 404 if `merge_id` doesn't refer to a live session (a typo
722///   or a session GC'd by a server restart).
723/// - 400 on malformed body.
724fn merge_resolve_handler(
725    state: &State,
726    merge_id: &str,
727    body: &str,
728) -> Response<std::io::Cursor<Vec<u8>>> {
729    let req: MergeResolveReq = match serde_json::from_str(body) {
730        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
731    };
732    let mut sessions = state.sessions.lock().unwrap();
733    let Some(wrapped) = sessions.get_mut(merge_id) else {
734        return error_response(404, format!("unknown merge_id `{merge_id}`"));
735    };
736    let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
737        .map(|e| (e.conflict_id, e.resolution))
738        .collect();
739    let verdicts = wrapped.inner.resolve(pairs);
740    let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
741    let body = serde_json::json!({
742        "verdicts": verdicts,
743        "remaining_conflicts": remaining,
744    });
745    json_response(200, &body)
746}
747
748/// `POST /v1/merge/<id>/commit` (#134) — finalize a merge
749/// session. Builds a `Merge` op from the auto-resolved sigs +
750/// the conflict resolutions, applies it to the dst branch, and
751/// returns the new head op id. The session is dropped on
752/// success; the caller would re-run `merge/start` to land
753/// further changes.
754///
755/// Errors:
756/// - 404: unknown `merge_id`.
757/// - 422: conflicts remaining (pass `Defer` or just don't
758///   resolve a conflict and you land here). Body carries the
759///   list so the caller knows which still need attention.
760/// - 422: a `Custom` resolution was used. The data layer
761///   supports them but landing them via HTTP needs an extra
762///   pass to apply the custom op against the dst branch
763///   first; deferred to a follow-up slice. Use TakeOurs /
764///   TakeTheirs for now.
765/// - 500: filesystem error while landing the merge op.
766fn merge_commit_handler(
767    state: &State,
768    merge_id: &str,
769) -> Response<std::io::Cursor<Vec<u8>>> {
770    use std::collections::BTreeMap;
771    let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
772        Some(w) => w,
773        None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
774    };
775    let dst_branch = wrapped.dst_branch.clone();
776    let src_head = wrapped.inner.src_head.clone();
777    let dst_head = wrapped.inner.dst_head.clone();
778    let auto_resolved = wrapped.inner.auto_resolved.clone();
779
780    // Translate auto-resolved + resolutions into the StageTransition::Merge
781    // entries map. Only sigs whose head changes relative to dst go in.
782    let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
783
784    // Auto-resolved: only `Src` (one-sided change on src) modifies dst.
785    for outcome in &auto_resolved {
786        if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
787            entries.insert(sig_id.clone(), stage_id.clone());
788        }
789    }
790
791    // Conflict resolutions.
792    let resolved = match wrapped.inner.commit() {
793        Ok(r) => r,
794        Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
795            // Re-insert isn't possible since we removed above; the
796            // caller will need to re-start. That's acceptable: a
797            // commit-with-unresolved-conflicts is operator error.
798            return error_with_detail(
799                422,
800                "conflicts remaining",
801                serde_json::json!({"unresolved": ids}),
802            );
803        }
804    };
805
806    for (conflict_id, resolution) in resolved {
807        match resolution {
808            lex_vcs::Resolution::TakeOurs => {
809                // Dst already has its head. No entry needed.
810            }
811            lex_vcs::Resolution::TakeTheirs => {
812                // Find the conflict's `theirs` stage_id in the
813                // session snapshot. We don't have direct access to
814                // it post-commit (commit consumed the session); but
815                // we can reconstruct from `auto_resolved` plus the
816                // session's pre-commit conflict map. Since we
817                // already moved the inner session, the cleanest fix
818                // for this slice is to rebuild from the on-disk
819                // graph: walk src_head, find the latest stage for
820                // the conflict's sig.
821                match resolve_take_theirs(state, &src_head, &conflict_id) {
822                    Ok(stage_id) => {
823                        entries.insert(conflict_id.clone(), stage_id);
824                    }
825                    Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
826                }
827            }
828            lex_vcs::Resolution::Custom { op } => {
829                // The agent's brand-new op carries the merge target
830                // in its kind (e.g. ModifyBody.to_stage_id). The op
831                // itself isn't separately recorded in the log here
832                // — its head-map effect is folded into the merge
833                // op's entries map. Callers that want the op as a
834                // first-class history entry should publish it via
835                // /v1/publish first and submit a TakeTheirs/TakeOurs
836                // resolution against the resulting head.
837                match op.kind.merge_target() {
838                    Some((sig, stage)) => {
839                        if sig != conflict_id {
840                            return error_with_detail(
841                                422,
842                                "custom op targets a different sig than the conflict",
843                                serde_json::json!({
844                                    "conflict_id": conflict_id,
845                                    "op_targets": sig,
846                                }),
847                            );
848                        }
849                        entries.insert(conflict_id, stage);
850                    }
851                    None => {
852                        return error_with_detail(
853                            422,
854                            "custom op kind doesn't yield a single sig→stage delta",
855                            serde_json::json!({
856                                "conflict_id": conflict_id,
857                                "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
858                            }),
859                        );
860                    }
861                }
862            }
863            lex_vcs::Resolution::Defer => {
864                // Unreachable: commit() rejects Defer above.
865                return error_response(500, "internal: Defer slipped past commit gate");
866            }
867        }
868    }
869
870    let resolved_count = entries.len();
871    let mut parents: Vec<lex_vcs::OpId> = Vec::new();
872    if let Some(d) = dst_head { parents.push(d); }
873    if let Some(s) = src_head { parents.push(s); }
874    let op = lex_vcs::Operation::new(
875        lex_vcs::OperationKind::Merge { resolved: resolved_count },
876        parents,
877    );
878    let transition = lex_vcs::StageTransition::Merge { entries };
879    let store = state.store.lock().unwrap();
880    match store.apply_operation(&dst_branch, op, transition) {
881        Ok(new_head_op) => json_response(200, &serde_json::json!({
882            "new_head_op": new_head_op,
883            "dst_branch": dst_branch,
884        })),
885        Err(e) => write_error_response("apply merge op", e),
886    }
887}
888
889/// Walk the op log from `src_head` backwards to find the latest
890/// stage assigned to `sig`. Used by the commit handler to figure
891/// out what stage `TakeTheirs` should land. `Ok(None)` means src
892/// removed the sig.
893fn resolve_take_theirs(
894    state: &State,
895    src_head: &Option<lex_vcs::OpId>,
896    sig: &lex_vcs::SigId,
897) -> std::io::Result<Option<lex_vcs::StageId>> {
898    let store = state.store.lock().unwrap();
899    let log = lex_vcs::OpLog::open(store.root())?;
900    let Some(head) = src_head.as_ref() else { return Ok(None); };
901    // Walk forward from root → head, replaying each op's transition
902    // for `sig`; the last assignment wins.
903    let mut current: Option<lex_vcs::StageId> = None;
904    for record in log.walk_forward(head, None)? {
905        match &record.produces {
906            lex_vcs::StageTransition::Create { sig_id, stage_id }
907                if sig_id == sig => { current = Some(stage_id.clone()); }
908            lex_vcs::StageTransition::Replace { sig_id, to, .. }
909                if sig_id == sig => { current = Some(to.clone()); }
910            lex_vcs::StageTransition::Remove { sig_id, .. }
911                if sig_id == sig => { current = None; }
912            lex_vcs::StageTransition::Rename { from, to, body_stage_id }
913                if from == sig || to == sig => {
914                if from == sig { current = None; }
915                if to == sig   { current = Some(body_stage_id.clone()); }
916            }
917            lex_vcs::StageTransition::Merge { entries } => {
918                if let Some(opt) = entries.get(sig) {
919                    current = opt.clone();
920                }
921            }
922            _ => {}
923        }
924    }
925    Ok(current)
926}
927
928fn mint_merge_id() -> MergeSessionId {
929    use std::sync::atomic::{AtomicU64, Ordering};
930    static COUNTER: AtomicU64 = AtomicU64::new(0);
931    let nanos = SystemTime::now()
932        .duration_since(UNIX_EPOCH)
933        .map(|d| d.as_nanos())
934        .unwrap_or(0);
935    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
936    format!("merge_{nanos:x}_{n:x}")
937}
938
939// ---- #242: append-only sync ---------------------------------------
940
941/// `POST /v1/ops/batch` (#242). Server endpoint for `lex op push`.
942///
943/// Body: a JSON array of `OperationRecord`s. The handler validates
944/// DAG integrity by checking that every op's `parents` either
945/// already exist on the remote *or* appear earlier in the same
946/// batch. This lets a client send a topologically-ordered slice
947/// without first probing for what's already there.
948///
949/// Response shape:
950///
951/// ```json
952/// { "received": N, "added": M, "skipped": (N-M), "added_ids": [...] }
953/// ```
954///
955/// Failure modes:
956///
957/// * `400` — body isn't a JSON array of op records.
958/// * `422` with `{ "error": "MissingParent", "detail": { "op_id":
959///   ..., "missing_parent": ... } }` if a parent is unreachable.
960///   The whole batch is rejected; nothing is persisted. The client
961///   should backfill the missing op and retry.
962/// * `409` if the supplied `op_id` doesn't match the canonical
963///   hash of the record's payload — content addressing must hold
964///   over the wire.
965///
966/// Idempotency: a record whose `op_id` already exists is silently
967/// skipped (not added, not rejected). Pushing the same payload
968/// twice is `received == N, added == 0` on the second call.
969pub(crate) fn ops_batch_handler(state: &State, body: &str)
970    -> Response<std::io::Cursor<Vec<u8>>>
971{
972    let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
973        Ok(r) => r,
974        Err(e) => return error_response(400,
975            format!("body must be a JSON array of OperationRecord: {e}")),
976    };
977    let store = state.store.lock().unwrap();
978    let log = match lex_vcs::OpLog::open(store.root()) {
979        Ok(l) => l,
980        Err(e) => return error_response(500, format!("opening op log: {e}")),
981    };
982
983    // Validate every record before persisting any of them.
984    //
985    // 1. Content-addressing: the supplied `op_id` must match the
986    //    canonical hash of `record.op`. Otherwise the client is
987    //    sending a forged or corrupted record.
988    // 2. DAG integrity: every parent must either already exist in
989    //    the local log OR appear earlier in this batch.
990    let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
991        std::collections::BTreeSet::new();
992    for rec in &records {
993        let expected = rec.op.op_id();
994        if expected != rec.op_id {
995            return error_with_detail(409, "OpIdMismatch", serde_json::json!({
996                "supplied": rec.op_id,
997                "expected": expected,
998            }));
999        }
1000        for parent in &rec.op.parents {
1001            let known = match log.get(parent) {
1002                Ok(Some(_)) => true,
1003                Ok(None) => false,
1004                Err(e) => return error_response(500, format!("op log read: {e}")),
1005            };
1006            if !known && !batch_ids.contains(parent) {
1007                return error_with_detail(422, "MissingParent", serde_json::json!({
1008                    "op_id": rec.op_id,
1009                    "missing_parent": parent,
1010                }));
1011            }
1012        }
1013        batch_ids.insert(rec.op_id.clone());
1014    }
1015
1016    // Persist. `OpLog::put` is idempotent so a re-push is a no-op
1017    // for already-present records.
1018    let mut added = 0usize;
1019    let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
1020    for rec in &records {
1021        let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
1022        match log.put(rec) {
1023            Ok(()) => {
1024                if !already_present {
1025                    added += 1;
1026                    added_ids.push(&rec.op_id);
1027                }
1028            }
1029            Err(e) => return error_response(500, format!("op log write: {e}")),
1030        }
1031    }
1032
1033    json_response(200, &serde_json::json!({
1034        "received": records.len(),
1035        "added": added,
1036        "skipped": records.len() - added,
1037        "added_ids": added_ids,
1038    }))
1039}
1040
1041/// `POST /v1/attestations/batch` (#242). Server endpoint for `lex
1042/// attest push`.
1043///
1044/// Body: a JSON array of `Attestation`s. Validates that each
1045/// attestation's `op_id` (when set) refers to an op that already
1046/// exists on the remote — `attestation_id` is then re-derivable
1047/// from the canonical form, so cross-store dedup just works.
1048///
1049/// Response: same shape as `ops_batch_handler` but `added_ids` is
1050/// the list of accepted `attestation_id`s.
1051///
1052/// Failure modes:
1053///
1054/// * `400` for malformed JSON.
1055/// * `422` with `{ "error": "UnknownOp", "detail": { ... } }` if
1056///   an attestation's `op_id` references an op the remote doesn't
1057///   know about. Whole batch rejected.
1058/// * `409` `AttestationIdMismatch` if the supplied id doesn't
1059///   match the canonical hash.
1060///
1061/// Idempotency: same as the ops endpoint — content-addressed dedup.
1062pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1063    -> Response<std::io::Cursor<Vec<u8>>>
1064{
1065    let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1066        Ok(a) => a,
1067        Err(e) => return error_response(400,
1068            format!("body must be a JSON array of Attestation: {e}")),
1069    };
1070    let store = state.store.lock().unwrap();
1071    let log = match store.attestation_log() {
1072        Ok(l) => l,
1073        Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1074    };
1075    let op_log = match lex_vcs::OpLog::open(store.root()) {
1076        Ok(l) => l,
1077        Err(e) => return error_response(500, format!("opening op log: {e}")),
1078    };
1079
1080    // Validate before persisting any record.
1081    for att in &attestations {
1082        // Content-addressing: re-derive attestation_id from the
1083        // payload and reject mismatches.
1084        let expected = lex_vcs::Attestation::with_timestamp(
1085            att.stage_id.clone(),
1086            att.op_id.clone(),
1087            att.intent_id.clone(),
1088            att.kind.clone(),
1089            att.result.clone(),
1090            att.produced_by.clone(),
1091            att.cost.clone(),
1092            att.timestamp,
1093        ).attestation_id;
1094        if expected != att.attestation_id {
1095            return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1096                "supplied": att.attestation_id,
1097                "expected": expected,
1098            }));
1099        }
1100        // The op_id field, if set, must point at an op the remote
1101        // knows about. Without this check, attestations would
1102        // dangle into a future sync that never lands the op.
1103        if let Some(op_id) = &att.op_id {
1104            match op_log.get(op_id) {
1105                Ok(Some(_)) => {}
1106                Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1107                    "attestation_id": att.attestation_id,
1108                    "op_id": op_id,
1109                })),
1110                Err(e) => return error_response(500, format!("op log read: {e}")),
1111            }
1112        }
1113    }
1114
1115    // Persist. `AttestationLog::put` is idempotent on
1116    // `attestation_id` and the by-stage index is rewritten as a
1117    // marker file, also idempotent.
1118    let mut added = 0usize;
1119    let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1120    for att in &attestations {
1121        let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1122        match log.put(att) {
1123            Ok(()) => {
1124                if !already_present {
1125                    added += 1;
1126                    added_ids.push(&att.attestation_id);
1127                }
1128            }
1129            Err(e) => return error_response(500, format!("attestation log write: {e}")),
1130        }
1131    }
1132
1133    json_response(200, &serde_json::json!({
1134        "received": attestations.len(),
1135        "added": added,
1136        "skipped": attestations.len() - added,
1137        "added_ids": added_ids,
1138    }))
1139}
1140
1141/// `GET /v1/branches/<name>/head` (#242 follow-up). Probe endpoint
1142/// the `lex op push` client uses to discover the remote head before
1143/// computing a delta against `OpLog::ops_since`.
1144///
1145/// Response: `{ "branch": "main", "head_op": Option<OpId> }`.
1146/// Returns 200 even when the branch doesn't exist locally — the
1147/// answer in that case is `head_op: null`, which is the right
1148/// signal for "send everything you have."
1149pub(crate) fn branch_head_handler(state: &State, name: &str)
1150    -> Response<std::io::Cursor<Vec<u8>>>
1151{
1152    let store = state.store.lock().unwrap();
1153    let head = match store.get_branch(name) {
1154        Ok(Some(b)) => b.head_op,
1155        Ok(None) => None,
1156        Err(e) => return error_response(500, format!("get_branch: {e}")),
1157    };
1158    json_response(200, &serde_json::json!({
1159        "branch": name,
1160        "head_op": head,
1161    }))
1162}
1163
1164/// `GET /v1/ops/since?after=<op_id>&branch=<name>&limit=<n>` (#260).
1165/// Server endpoint for `lex op pull`.
1166///
1167/// Returns a JSON array of `OperationRecord`s reachable from
1168/// `branch.head_op` but not from `<after>`, sorted **oldest-first**
1169/// so the client can apply them in topological order without
1170/// re-sorting. Empty array when:
1171///
1172/// * The branch doesn't exist on the remote.
1173/// * The branch's `head_op` is `None`.
1174/// * `after == branch.head_op` (caller is already at the remote's head).
1175/// * `after` is *ahead of* the remote's head (caller is past the
1176///   remote — the symmetric "remote behind" case from #260).
1177///
1178/// `branch` defaults to `main`. `limit` caps the response — useful
1179/// for chunked pulls of large gaps; clients re-issue with the next
1180/// `after` once the prefix has landed.
1181///
1182/// Failure modes:
1183///
1184/// * `400` if the query string is malformed.
1185/// * `200` with `[]` for any of the empty-result cases above. "Caller
1186///   is already up to date" is a normal answer, not an error.
1187pub(crate) fn ops_since_handler(state: &State, query: &str)
1188    -> Response<std::io::Cursor<Vec<u8>>>
1189{
1190    let mut after: Option<String> = None;
1191    let mut branch = String::from("main");
1192    let mut limit: Option<usize> = None;
1193    for kv in query.split('&') {
1194        let Some((k, v)) = kv.split_once('=') else { continue };
1195        match k {
1196            "after" => after = Some(v.to_string()),
1197            "branch" => branch = v.to_string(),
1198            "limit" => {
1199                limit = Some(match v.parse::<usize>() {
1200                    Ok(n) => n,
1201                    Err(_) => return error_response(400,
1202                        format!("limit must be a positive integer, got `{v}`")),
1203                });
1204            }
1205            _ => {}
1206        }
1207    }
1208
1209    let store = state.store.lock().unwrap();
1210    let log = match lex_vcs::OpLog::open(store.root()) {
1211        Ok(l) => l,
1212        Err(e) => return error_response(500, format!("opening op log: {e}")),
1213    };
1214    let head = match store.get_branch(&branch) {
1215        Ok(Some(b)) => b.head_op,
1216        Ok(None) => None,
1217        Err(e) => return error_response(500, format!("get_branch: {e}")),
1218    };
1219    let Some(head) = head else {
1220        return json_response(200, &serde_json::json!([]));
1221    };
1222
1223    let ops_since = match log.ops_since(&head, after.as_ref()) {
1224        Ok(o) => o,
1225        Err(e) => return error_response(500, format!("ops_since: {e}")),
1226    };
1227    // ops_since walks newest-first; reverse so the client receives
1228    // oldest-first and can apply them in topological order with
1229    // `OpLog::put` straight through.
1230    let mut ops = ops_since;
1231    ops.reverse();
1232    if let Some(n) = limit {
1233        ops.truncate(n);
1234    }
1235
1236    json_response(200, &serde_json::to_value(&ops).unwrap_or_default())
1237}
1238
1239/// `GET /v1/attestations/since?after-op=<op_id>&limit=<n>` (#260).
1240/// Mirror of `ops_since_handler` for the attestation log.
1241///
1242/// Returns attestations whose `op_id` field is reachable from
1243/// **any** branch's head — not just one — and not in `after_op`'s
1244/// ancestry. The cross-branch fan-out matches the push side:
1245/// attestations are stage-keyed, not branch-keyed, so a single
1246/// "since this op" filter is the right shape.
1247///
1248/// Attestations with `op_id: None` (e.g. `Override`,
1249/// `ProducerBlock`) are always included — the cutoff doesn't apply.
1250/// `--limit` caps the response.
1251pub(crate) fn attestations_since_handler(state: &State, query: &str)
1252    -> Response<std::io::Cursor<Vec<u8>>>
1253{
1254    let mut after_op: Option<String> = None;
1255    let mut limit: Option<usize> = None;
1256    for kv in query.split('&') {
1257        let Some((k, v)) = kv.split_once('=') else { continue };
1258        match k {
1259            "after-op" => after_op = Some(v.to_string()),
1260            "limit" => {
1261                limit = Some(match v.parse::<usize>() {
1262                    Ok(n) => n,
1263                    Err(_) => return error_response(400,
1264                        format!("limit must be a positive integer, got `{v}`")),
1265                });
1266            }
1267            _ => {}
1268        }
1269    }
1270
1271    let store = state.store.lock().unwrap();
1272    let log = match store.attestation_log() {
1273        Ok(l) => l,
1274        Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1275    };
1276
1277    // Build the exclude set: every op_id reachable from `after_op`,
1278    // inclusive. Attestations whose op_id is in this set were
1279    // already known to the caller.
1280    let exclude: std::collections::BTreeSet<String> = match &after_op {
1281        None => std::collections::BTreeSet::new(),
1282        Some(cutoff) => {
1283            let op_log = match lex_vcs::OpLog::open(store.root()) {
1284                Ok(l) => l,
1285                Err(e) => return error_response(500, format!("opening op log: {e}")),
1286            };
1287            match op_log.walk_back(cutoff, None) {
1288                Ok(records) => records.into_iter().map(|r| r.op_id).collect(),
1289                Err(_) => {
1290                    // Cutoff op doesn't exist on this remote. Treat
1291                    // as "no exclude" — caller will get every
1292                    // attestation. They may dedup client-side.
1293                    std::collections::BTreeSet::new()
1294                }
1295            }
1296        }
1297    };
1298
1299    let all = match log.list_all() {
1300        Ok(v) => v,
1301        Err(e) => return error_response(500, format!("listing attestations: {e}")),
1302    };
1303    let mut filtered: Vec<lex_vcs::Attestation> = all
1304        .into_iter()
1305        .filter(|a| match &a.op_id {
1306            Some(op_id) => !exclude.contains(op_id),
1307            // No op_id = doesn't participate in the cutoff; always
1308            // ship it on the first pull, server-side idempotency
1309            // dedupes on the client.
1310            None => true,
1311        })
1312        .collect();
1313    // Stable order: oldest-first by `timestamp`, then by
1314    // `attestation_id` for ties. Lets the client land them
1315    // deterministically.
1316    filtered.sort_by(|a, b| {
1317        a.timestamp.cmp(&b.timestamp)
1318            .then_with(|| a.attestation_id.cmp(&b.attestation_id))
1319    });
1320    if let Some(n) = limit {
1321        filtered.truncate(n);
1322    }
1323
1324    json_response(200, &serde_json::to_value(&filtered).unwrap_or_default())
1325}
1326