Skip to main content

lex_api/
handlers.rs

1//! Request routing for the agent API.
2//!
3//! Each handler is a synchronous function that returns
4//! `Result<serde_json::Value, ApiError>`. The dispatcher wraps the result
5//! in an HTTP response — successes as 200 with the JSON body, structured
6//! errors as 4xx/5xx with a JSON envelope.
7
8use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23    pub store: Mutex<Store>,
24    /// Filesystem root of the store. Held alongside the `Store`
25    /// itself so handlers that need to read store-level files
26    /// (e.g. `users.json` for actor auth) don't have to round-
27    /// trip through the lock.
28    pub root: PathBuf,
29    /// In-memory merge sessions, keyed by MergeSessionId. Sessions
30    /// are ephemeral by design (#134 foundation): they live for the
31    /// lifetime of the server process and are GC'd on commit. A
32    /// future slice can persist them to disk so a session survives
33    /// process restarts. For now an agent that gets unlucky with a
34    /// restart re-runs `merge/start` and gets a fresh session.
35    pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38/// Server-side wrapper around [`MergeSession`] carrying the
39/// branch names that started the merge. The lex-vcs session
40/// itself only tracks `OpId` heads; commit needs the dst branch
41/// name to advance the right head, and the src branch name is
42/// kept for round-trip auditability ("which branch did we merge
43/// from?").
44pub struct ApiMergeSession {
45    pub inner: MergeSession,
46    pub src_branch: String,
47    pub dst_branch: String,
48}
49
50impl State {
51    pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52        Ok(Self {
53            store: Mutex::new(Store::open(&root)?),
54            root,
55            sessions: Mutex::new(HashMap::new()),
56        })
57    }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62    error: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68    let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69    Response::from_data(bytes)
70        .with_status_code(status)
71        .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75    json_response(status, &serde_json::to_value(ErrorEnvelope {
76        error: msg.into(), detail: None,
77    }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81    -> Response<std::io::Cursor<Vec<u8>>>
82{
83    json_response(status, &serde_json::to_value(ErrorEnvelope {
84        error: msg.into(), detail: Some(detail),
85    }).unwrap())
86}
87
88/// Map a `StoreError` from a write path (`apply_operation` /
89/// `apply_operation_checked`) to an HTTP response. The only special
90/// case today is `Contention` (#262 multi-writer CAS retries
91/// exhausted), which maps to 503 with a `Retry-After` header so
92/// clients back off rather than hammering the same branch tip.
93fn write_error_response(prefix: &str, err: lex_store::StoreError)
94    -> Response<std::io::Cursor<Vec<u8>>>
95{
96    if let lex_store::StoreError::Contention { branch, attempts } = &err {
97        let body = serde_json::to_vec(&ErrorEnvelope {
98            error: format!("{prefix}: branch '{branch}' is contended (attempts={attempts})"),
99            detail: Some(serde_json::json!({
100                "kind": "contention",
101                "branch": branch,
102                "attempts": attempts,
103            })),
104        }).unwrap_or_else(|_| b"{}".to_vec());
105        return Response::from_data(body)
106            .with_status_code(503)
107            .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
108            .with_header(Header::from_bytes(&b"Retry-After"[..], &b"1"[..]).unwrap());
109    }
110    error_response(500, format!("{prefix}: {err}"))
111}
112
113pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
114    let method = req.method().clone();
115    let url = req.url().to_string();
116    let path = url.split('?').next().unwrap_or("").to_string();
117    let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
118
119    // `X-Lex-User` is the v3d session identifier — set by humans
120    // operating the web UI through whatever proxy fronts auth, or
121    // by AI agents calling the JSON API. We pluck it once here so
122    // every handler can take it as a borrowed string.
123    let x_lex_user = req.headers().iter()
124        .find(|h| h.field.equiv("x-lex-user"))
125        .map(|h| h.value.as_str().to_string());
126
127    let mut body = String::new();
128    let _ = req.as_reader().read_to_string(&mut body);
129
130    let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
131    req.respond(resp)
132}
133
134fn route(
135    state: &State,
136    method: &Method,
137    path: &str,
138    query: &str,
139    body: &str,
140    x_lex_user: Option<&str>,
141) -> Response<std::io::Cursor<Vec<u8>>> {
142    match (method, path) {
143        // ---- lex-tea v2 (HTML browser) ------------------------
144        (Method::Get, "/") => crate::web::activity_handler(state),
145        (Method::Get, "/web/branches") => crate::web::branches_handler(state),
146        (Method::Get, "/web/trust") => crate::web::trust_handler(state),
147        (Method::Get, "/web/attention") => crate::web::attention_handler(state),
148        (Method::Get, p) if p.starts_with("/web/branch/") => {
149            let name = &p["/web/branch/".len()..];
150            crate::web::branch_handler(state, name)
151        }
152        (Method::Get, p) if p.starts_with("/web/stage/") => {
153            let id = &p["/web/stage/".len()..];
154            crate::web::stage_html_handler(state, id)
155        }
156        // lex-tea v3 human-triage actions (#172). HTML forms post
157        // to /web/stage/<id>/{pin,defer,block,unblock} with a
158        // `reason` body. All four share one handler; the verb in
159        // the path picks the AttestationKind.
160        (Method::Post, p) if p.starts_with("/web/stage/") && (
161            p.ends_with("/pin") || p.ends_with("/defer")
162            || p.ends_with("/block") || p.ends_with("/unblock")
163        ) => {
164            let prefix_len = "/web/stage/".len();
165            let last_slash = p.rfind('/').unwrap_or(p.len());
166            let id = &p[prefix_len..last_slash];
167            let verb = &p[last_slash + 1..];
168            let decision = match verb {
169                "pin"     => crate::web::WebStageDecision::Pin,
170                "defer"   => crate::web::WebStageDecision::Defer,
171                "block"   => crate::web::WebStageDecision::Block,
172                "unblock" => crate::web::WebStageDecision::Unblock,
173                _ => unreachable!("matched in outer guard"),
174            };
175            crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
176        }
177        // ---- JSON API -----------------------------------------
178        (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
179        (Method::Post, "/v1/parse") => parse_handler(body),
180        (Method::Post, "/v1/check") => check_handler(body),
181        (Method::Post, "/v1/publish") => publish_handler(state, body),
182        (Method::Post, "/v1/patch") => patch_handler(state, body),
183        (Method::Get, p) if p.starts_with("/v1/stage/") => {
184            let suffix = &p["/v1/stage/".len()..];
185            // Match `/v1/stage/<id>/attestations` first so a literal
186            // stage_id of "attestations" can't be misrouted.
187            if let Some(id) = suffix.strip_suffix("/attestations") {
188                stage_attestations_handler(state, id)
189            } else {
190                stage_handler(state, suffix)
191            }
192        }
193        (Method::Post, "/v1/run") => run_handler(state, body, false),
194        (Method::Post, "/v1/replay") => run_handler(state, body, true),
195        (Method::Get, p) if p.starts_with("/v1/trace/") => {
196            let id = &p["/v1/trace/".len()..];
197            trace_handler(state, id)
198        }
199        (Method::Get, "/v1/diff") => diff_handler(state, query),
200        (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
201        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
202            let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
203            merge_resolve_handler(state, id, body)
204        }
205        (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
206            let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
207            merge_commit_handler(state, id)
208        }
209        // ---- #242: append-only sync of op log + attestation log
210        (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
211        (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
212        // Probe endpoint for `lex op push` to discover the remote's
213        // current head before computing a delta. Returns
214        // `{ "head_op": Option<OpId> }`. `<branch>` is URL-encoded.
215        (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
216            let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
217            branch_head_handler(state, name)
218        }
219        // ---- #260: append-only fetch (inverse of #242 push)
220        // Body is a JSON array of OperationRecords reachable from
221        // `branch.head_op` but not from `after`, oldest-first.
222        (Method::Get, "/v1/ops/since") => ops_since_handler(state, query),
223        (Method::Get, "/v1/attestations/since") => attestations_since_handler(state, query),
224        _ => error_response(404, format!("unknown route: {method:?} {path}")),
225    }
226}
227
228#[derive(Deserialize)]
229struct ParseReq { source: String }
230
231fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
232    let req: ParseReq = match serde_json::from_str(body) {
233        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
234    };
235    match load_program_from_str(&req.source) {
236        Ok(prog) => {
237            let stages = canonicalize_program(&prog);
238            json_response(200, &serde_json::to_value(&stages).unwrap())
239        }
240        Err(e) => error_response(400, format!("syntax error: {e}")),
241    }
242}
243
244pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
245    let req: ParseReq = match serde_json::from_str(body) {
246        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
247    };
248    let prog = match load_program_from_str(&req.source) {
249        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
250    };
251    let stages = canonicalize_program(&prog);
252    match lex_types::check_program(&stages) {
253        Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
254        Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
255    }
256}
257
258#[derive(Deserialize)]
259struct PublishReq { source: String, #[serde(default)] activate: bool }
260
261pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
262    let req: PublishReq = match serde_json::from_str(body) {
263        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
264    };
265    let prog = match load_program_from_str(&req.source) {
266        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
267    };
268    // #168: rewrite stdlib parse calls to parse_strict so the
269    // bytecode emitted from these stages enforces required-field
270    // checks at runtime.
271    let mut stages = canonicalize_program(&prog);
272    if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
273        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
274    }
275
276    let store = state.store.lock().unwrap();
277    let branch = store.current_branch();
278
279    // Compute diff between what's already on the branch and the new program.
280    let old_head = match store.branch_head(&branch) {
281        Ok(h) => h,
282        Err(e) => return error_response(500, format!("branch_head: {e}")),
283    };
284    let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
285        .filter_map(|stg| store.get_ast(stg).ok())
286        .filter_map(|s| match s {
287            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
288            _ => None,
289        })
290        .collect();
291    let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
292        .filter_map(|s| match s {
293            lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
294            _ => None,
295        })
296        .collect();
297    let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
298
299    // Build new imports map from any Import stages in the source.
300    let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
301    {
302        let entry = new_imports.entry("<source>".into()).or_default();
303        for s in &stages {
304            if let lex_ast::Stage::Import(im) = s {
305                entry.insert(im.reference.clone());
306            }
307        }
308    }
309
310    match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
311        Ok(outcome) => json_response(200, &serde_json::json!({
312            "ops": outcome.ops,
313            "head_op": outcome.head_op,
314        })),
315        // The store-write gate (#130) also type-checks at the top
316        // of `publish_program`. The handler above already pre-checks,
317        // so this branch is reached only on a race or a state we
318        // didn't see at handler time. Surface the structured
319        // envelope (422) instead of a generic 500 — same shape the
320        // initial pre-check uses, so a client only has one error
321        // contract to handle.
322        Err(lex_store::StoreError::TypeError(errs)) => {
323            error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
324        }
325        Err(e) => write_error_response("publish_program", e),
326    }
327}
328
329#[derive(Deserialize)]
330struct PatchReq {
331    stage_id: String,
332    patch: lex_ast::Patch,
333    #[serde(default)] activate: bool,
334}
335
336/// POST /v1/patch — apply a structured edit to a stored stage's
337/// canonical AST, type-check the result, and publish a new stage.
338fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
339    let req: PatchReq = match serde_json::from_str(body) {
340        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
341    };
342    let store = state.store.lock().unwrap();
343
344    // 1. Load.
345    let original = match store.get_ast(&req.stage_id) {
346        Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
347    };
348
349    // 2. Apply.
350    let patched = match lex_ast::apply_patch(&original, &req.patch) {
351        Ok(s) => s,
352        Err(e) => return error_with_detail(422, "patch failed",
353            serde_json::to_value(&e).unwrap_or_default()),
354    };
355
356    // 3. Type-check the new stage in isolation.
357    let stages = vec![patched.clone()];
358    if let Err(errs) = lex_types::check_program(&stages) {
359        return error_with_detail(422, "type errors after patch",
360            serde_json::to_value(&errs).unwrap_or_default());
361    }
362
363    // Routing through apply_operation so /v1/patch participates in
364    // the op DAG. We know this op is always a body change on the
365    // existing sig (a patch can't add a brand-new fn).
366    let branch = store.current_branch();
367
368    // Find the sig — patched stage's sig must match the original's.
369    let sig = match lex_ast::sig_id(&patched) {
370        Some(s) => s,
371        None => return error_response(500, "patched stage has no sig_id"),
372    };
373
374    let new_id = match store.publish(&patched) {
375        Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
376    };
377    if req.activate {
378        if let Err(e) = store.activate(&new_id) {
379            return error_response(500, format!("activate: {e}"));
380        }
381    }
382
383    // Determine op kind: ChangeEffectSig if effects differ, ModifyBody otherwise.
384    let original_effects: std::collections::BTreeSet<String> = match &original {
385        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
386        _ => std::collections::BTreeSet::new(),
387    };
388    let patched_effects: std::collections::BTreeSet<String> = match &patched {
389        lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
390        _ => std::collections::BTreeSet::new(),
391    };
392    let head_now = match store.get_branch(&branch) {
393        Ok(b) => b.and_then(|b| b.head_op),
394        Err(e) => return error_response(500, format!("get_branch: {e}")),
395    };
396    let kind = if original_effects != patched_effects {
397        // #247: budget delta is part of the canonical payload now.
398        // Patch endpoints don't currently rehydrate the AST to
399        // recompute budgets, so leave them None — clients that
400        // need budget tracking should publish through the diff
401        // pipeline (`lex publish`) where `compute_diff` populates
402        // them.
403        let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
404        let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
405        lex_vcs::OperationKind::ChangeEffectSig {
406            sig_id: sig.clone(),
407            from_stage_id: req.stage_id.clone(),
408            to_stage_id: new_id.clone(),
409            from_effects: original_effects,
410            to_effects: patched_effects,
411            from_budget,
412            to_budget,
413        }
414    } else {
415        let budget = lex_vcs::operation_budget_from_effects(&original_effects);
416        lex_vcs::OperationKind::ModifyBody {
417            sig_id: sig.clone(),
418            from_stage_id: req.stage_id.clone(),
419            to_stage_id: new_id.clone(),
420            from_budget: budget,
421            to_budget: budget,
422        }
423    };
424    let transition = lex_vcs::StageTransition::Replace {
425        sig_id: sig.clone(),
426        from: req.stage_id.clone(),
427        to: new_id.clone(),
428    };
429    let op = lex_vcs::Operation::new(
430        kind,
431        head_now.into_iter().collect::<Vec<_>>(),
432    );
433    let op_id = match store.apply_operation(&branch, op, transition) {
434        Ok(id) => id,
435        Err(e) => return write_error_response("apply_operation", e),
436    };
437
438    let status = format!("{:?}",
439        store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
440    json_response(200, &serde_json::json!({
441        "old_stage_id": req.stage_id,
442        "new_stage_id": new_id,
443        "sig_id": sig,
444        "status": status,
445        "op_id": op_id,
446    }))
447}
448
449pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
450    let store = state.store.lock().unwrap();
451    let meta = match store.get_metadata(id) {
452        Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
453    };
454    let ast = match store.get_ast(id) {
455        Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
456    };
457    let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
458    json_response(200, &serde_json::json!({
459        "metadata": meta,
460        "ast": ast,
461        "status": status,
462    }))
463}
464
465/// `GET /v1/stage/<id>/attestations` — every persisted attestation
466/// for this stage, newest-first by timestamp. Issue #132's
467/// queryable-evidence consumer surface.
468///
469/// 404s on unknown stage_id (matches `/v1/stage/<id>`'s shape so a
470/// caller round-tripping both endpoints sees consistent errors).
471/// Empty list (200) is *evidence of absence*: the stage exists but
472/// no producer has attested it.
473pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
474    let store = state.store.lock().unwrap();
475    if let Err(e) = store.get_metadata(id) {
476        return error_response(404, format!("{e}"));
477    }
478    let log = match store.attestation_log() {
479        Ok(l) => l,
480        Err(e) => return error_response(500, format!("attestation log: {e}")),
481    };
482    let mut listing = match log.list_for_stage(&id.to_string()) {
483        Ok(v) => v,
484        Err(e) => return error_response(500, format!("list_for_stage: {e}")),
485    };
486    listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
487    json_response(200, &serde_json::json!({"attestations": listing}))
488}
489
490#[derive(Deserialize, Default)]
491struct PolicyJson {
492    #[serde(default)] allow_effects: Vec<String>,
493    #[serde(default)] allow_fs_read: Vec<String>,
494    #[serde(default)] allow_fs_write: Vec<String>,
495    #[serde(default)] budget: Option<u64>,
496}
497
498impl PolicyJson {
499    fn into_policy(self) -> Policy {
500        Policy {
501            allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
502            allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
503            allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
504            allow_net_host: Vec::new(),
505            allow_proc: Vec::new(),
506            budget: self.budget,
507        }
508    }
509}
510
511#[derive(Deserialize)]
512struct RunReq {
513    source: String,
514    #[serde(rename = "fn")] func: String,
515    #[serde(default)] args: Vec<serde_json::Value>,
516    #[serde(default)] policy: PolicyJson,
517    #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
518}
519
520pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
521    let req: RunReq = match serde_json::from_str(body) {
522        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
523    };
524    let prog = match load_program_from_str(&req.source) {
525        Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
526    };
527    let stages = canonicalize_program(&prog);
528    if let Err(errs) = lex_types::check_program(&stages) {
529        return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
530    }
531    let bc = compile_program(&stages);
532    let policy = req.policy.into_policy();
533    if let Err(violations) = check_policy(&bc, &policy) {
534        return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
535    }
536
537    let mut recorder = lex_trace::Recorder::new();
538    if with_overrides && !req.overrides.is_empty() {
539        recorder = recorder.with_overrides(req.overrides);
540    }
541    let handle = recorder.handle();
542    let handler = DefaultHandler::new(policy);
543    let mut vm = Vm::with_handler(&bc, Box::new(handler));
544    vm.set_tracer(Box::new(recorder));
545
546    let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
547    let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
548    let result = vm.call(&req.func, vargs);
549    let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
550
551    let store = state.store.lock().unwrap();
552    let (root_out, root_err, status) = match &result {
553        Ok(v) => (Some(value_to_json(v)), None, 200u16),
554        Err(e) => (None, Some(format!("{e}")), 200u16),
555    };
556    let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
557        root_out.clone(), root_err.clone(), started, ended);
558    let run_id = match store.save_trace(&tree) {
559        Ok(id) => id,
560        Err(e) => return error_response(500, format!("save_trace: {e}")),
561    };
562
563    let mut body = serde_json::json!({
564        "run_id": run_id,
565        "output": root_out,
566    });
567    if let Some(err) = root_err {
568        body["error"] = serde_json::Value::String(err);
569    }
570    json_response(status, &body)
571}
572
573fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
574    let store = state.store.lock().unwrap();
575    match store.load_trace(id) {
576        Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
577        Err(e) => error_response(404, format!("{e}")),
578    }
579}
580
581fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
582    let mut a = None;
583    let mut b = None;
584    for kv in query.split('&') {
585        if let Some((k, v)) = kv.split_once('=') {
586            match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
587        }
588    }
589    let (Some(a), Some(b)) = (a, b) else {
590        return error_response(400, "missing a or b query params");
591    };
592    let store = state.store.lock().unwrap();
593    let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
594    let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
595    match lex_trace::diff_runs(&ta, &tb) {
596        Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
597        None => json_response(200, &serde_json::json!({"divergence": null})),
598    }
599}
600
601fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
602
603fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
604
605#[derive(Deserialize)]
606struct MergeStartReq {
607    src_branch: String,
608    dst_branch: String,
609}
610
611/// `POST /v1/merge/start` (#134) — open a stateful merge between two
612/// branch heads and return the conflicts the agent needs to
613/// resolve. Auto-resolved sigs (one-sided changes, identical
614/// changes both sides) are returned for audit but don't block
615/// commit.
616///
617/// Response: `{ merge_id, src_head, dst_head, lca, conflicts,
618/// auto_resolved_count }`. The session is held in process memory
619/// keyed by `merge_id` for subsequent `resolve` / `commit` calls
620/// (next slices).
621fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
622    let req: MergeStartReq = match serde_json::from_str(body) {
623        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
624    };
625    let store = state.store.lock().unwrap();
626    let src_head = match store.get_branch(&req.src_branch) {
627        Ok(Some(b)) => b.head_op,
628        Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
629        Err(e) => return error_response(500, format!("src branch read: {e}")),
630    };
631    let dst_head = match store.get_branch(&req.dst_branch) {
632        Ok(Some(b)) => b.head_op,
633        Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
634        Err(e) => return error_response(500, format!("dst branch read: {e}")),
635    };
636    let log = match lex_vcs::OpLog::open(store.root()) {
637        Ok(l) => l,
638        Err(e) => return error_response(500, format!("op log: {e}")),
639    };
640    // Caller doesn't choose merge_ids — minted server-side from
641    // wall clock + a per-process counter avoids leaking session
642    // ids' shape into the public surface.
643    let merge_id = mint_merge_id();
644    let session = match MergeSession::start(
645        merge_id.clone(),
646        &log,
647        src_head.as_ref(),
648        dst_head.as_ref(),
649    ) {
650        Ok(s) => s,
651        Err(e) => return error_response(500, format!("merge start: {e}")),
652    };
653    let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
654    let auto_resolved_count = session.auto_resolved.len();
655    let body = serde_json::json!({
656        "merge_id": merge_id,
657        "src_head": session.src_head,
658        "dst_head": session.dst_head,
659        "lca":      session.lca,
660        "conflicts": conflicts,
661        "auto_resolved_count": auto_resolved_count,
662    });
663    drop(conflicts);
664    drop(store);
665    let wrapped = ApiMergeSession {
666        inner: session,
667        src_branch: req.src_branch,
668        dst_branch: req.dst_branch,
669    };
670    state.sessions.lock().unwrap().insert(merge_id, wrapped);
671    json_response(200, &body)
672}
673
674#[derive(Deserialize)]
675struct MergeResolveReq {
676    /// Each entry is `(conflict_id, resolution)`. The resolution is
677    /// the same shape as `lex_vcs::Resolution`'s tagged JSON form
678    /// — `{"kind":"take_ours"}`, `{"kind":"take_theirs"}`,
679    /// `{"kind":"defer"}`, or `{"kind":"custom","op":{...}}`.
680    resolutions: Vec<MergeResolveEntry>,
681}
682
683#[derive(Deserialize)]
684struct MergeResolveEntry {
685    conflict_id: String,
686    resolution: lex_vcs::Resolution,
687}
688
689/// `POST /v1/merge/<id>/resolve` (#134) — submit batched
690/// resolutions against the conflicts surfaced by `merge/start`.
691/// Returns one verdict per input: accepted (recorded against the
692/// session) or rejected (with structured reason). The session
693/// stays alive across calls so an agent can iterate.
694///
695/// Errors:
696/// - 404 if `merge_id` doesn't refer to a live session (a typo
697///   or a session GC'd by a server restart).
698/// - 400 on malformed body.
699fn merge_resolve_handler(
700    state: &State,
701    merge_id: &str,
702    body: &str,
703) -> Response<std::io::Cursor<Vec<u8>>> {
704    let req: MergeResolveReq = match serde_json::from_str(body) {
705        Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
706    };
707    let mut sessions = state.sessions.lock().unwrap();
708    let Some(wrapped) = sessions.get_mut(merge_id) else {
709        return error_response(404, format!("unknown merge_id `{merge_id}`"));
710    };
711    let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
712        .map(|e| (e.conflict_id, e.resolution))
713        .collect();
714    let verdicts = wrapped.inner.resolve(pairs);
715    let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
716    let body = serde_json::json!({
717        "verdicts": verdicts,
718        "remaining_conflicts": remaining,
719    });
720    json_response(200, &body)
721}
722
723/// `POST /v1/merge/<id>/commit` (#134) — finalize a merge
724/// session. Builds a `Merge` op from the auto-resolved sigs +
725/// the conflict resolutions, applies it to the dst branch, and
726/// returns the new head op id. The session is dropped on
727/// success; the caller would re-run `merge/start` to land
728/// further changes.
729///
730/// Errors:
731/// - 404: unknown `merge_id`.
732/// - 422: conflicts remaining (pass `Defer` or just don't
733///   resolve a conflict and you land here). Body carries the
734///   list so the caller knows which still need attention.
735/// - 422: a `Custom` resolution was used. The data layer
736///   supports them but landing them via HTTP needs an extra
737///   pass to apply the custom op against the dst branch
738///   first; deferred to a follow-up slice. Use TakeOurs /
739///   TakeTheirs for now.
740/// - 500: filesystem error while landing the merge op.
741fn merge_commit_handler(
742    state: &State,
743    merge_id: &str,
744) -> Response<std::io::Cursor<Vec<u8>>> {
745    use std::collections::BTreeMap;
746    let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
747        Some(w) => w,
748        None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
749    };
750    let dst_branch = wrapped.dst_branch.clone();
751    let src_head = wrapped.inner.src_head.clone();
752    let dst_head = wrapped.inner.dst_head.clone();
753    let auto_resolved = wrapped.inner.auto_resolved.clone();
754
755    // Translate auto-resolved + resolutions into the StageTransition::Merge
756    // entries map. Only sigs whose head changes relative to dst go in.
757    let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
758
759    // Auto-resolved: only `Src` (one-sided change on src) modifies dst.
760    for outcome in &auto_resolved {
761        if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
762            entries.insert(sig_id.clone(), stage_id.clone());
763        }
764    }
765
766    // Conflict resolutions.
767    let resolved = match wrapped.inner.commit() {
768        Ok(r) => r,
769        Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
770            // Re-insert isn't possible since we removed above; the
771            // caller will need to re-start. That's acceptable: a
772            // commit-with-unresolved-conflicts is operator error.
773            return error_with_detail(
774                422,
775                "conflicts remaining",
776                serde_json::json!({"unresolved": ids}),
777            );
778        }
779    };
780
781    for (conflict_id, resolution) in resolved {
782        match resolution {
783            lex_vcs::Resolution::TakeOurs => {
784                // Dst already has its head. No entry needed.
785            }
786            lex_vcs::Resolution::TakeTheirs => {
787                // Find the conflict's `theirs` stage_id in the
788                // session snapshot. We don't have direct access to
789                // it post-commit (commit consumed the session); but
790                // we can reconstruct from `auto_resolved` plus the
791                // session's pre-commit conflict map. Since we
792                // already moved the inner session, the cleanest fix
793                // for this slice is to rebuild from the on-disk
794                // graph: walk src_head, find the latest stage for
795                // the conflict's sig.
796                match resolve_take_theirs(state, &src_head, &conflict_id) {
797                    Ok(stage_id) => {
798                        entries.insert(conflict_id.clone(), stage_id);
799                    }
800                    Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
801                }
802            }
803            lex_vcs::Resolution::Custom { op } => {
804                // The agent's brand-new op carries the merge target
805                // in its kind (e.g. ModifyBody.to_stage_id). The op
806                // itself isn't separately recorded in the log here
807                // — its head-map effect is folded into the merge
808                // op's entries map. Callers that want the op as a
809                // first-class history entry should publish it via
810                // /v1/publish first and submit a TakeTheirs/TakeOurs
811                // resolution against the resulting head.
812                match op.kind.merge_target() {
813                    Some((sig, stage)) => {
814                        if sig != conflict_id {
815                            return error_with_detail(
816                                422,
817                                "custom op targets a different sig than the conflict",
818                                serde_json::json!({
819                                    "conflict_id": conflict_id,
820                                    "op_targets": sig,
821                                }),
822                            );
823                        }
824                        entries.insert(conflict_id, stage);
825                    }
826                    None => {
827                        return error_with_detail(
828                            422,
829                            "custom op kind doesn't yield a single sig→stage delta",
830                            serde_json::json!({
831                                "conflict_id": conflict_id,
832                                "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
833                            }),
834                        );
835                    }
836                }
837            }
838            lex_vcs::Resolution::Defer => {
839                // Unreachable: commit() rejects Defer above.
840                return error_response(500, "internal: Defer slipped past commit gate");
841            }
842        }
843    }
844
845    let resolved_count = entries.len();
846    let mut parents: Vec<lex_vcs::OpId> = Vec::new();
847    if let Some(d) = dst_head { parents.push(d); }
848    if let Some(s) = src_head { parents.push(s); }
849    let op = lex_vcs::Operation::new(
850        lex_vcs::OperationKind::Merge { resolved: resolved_count },
851        parents,
852    );
853    let transition = lex_vcs::StageTransition::Merge { entries };
854    let store = state.store.lock().unwrap();
855    match store.apply_operation(&dst_branch, op, transition) {
856        Ok(new_head_op) => json_response(200, &serde_json::json!({
857            "new_head_op": new_head_op,
858            "dst_branch": dst_branch,
859        })),
860        Err(e) => write_error_response("apply merge op", e),
861    }
862}
863
864/// Walk the op log from `src_head` backwards to find the latest
865/// stage assigned to `sig`. Used by the commit handler to figure
866/// out what stage `TakeTheirs` should land. `Ok(None)` means src
867/// removed the sig.
868fn resolve_take_theirs(
869    state: &State,
870    src_head: &Option<lex_vcs::OpId>,
871    sig: &lex_vcs::SigId,
872) -> std::io::Result<Option<lex_vcs::StageId>> {
873    let store = state.store.lock().unwrap();
874    let log = lex_vcs::OpLog::open(store.root())?;
875    let Some(head) = src_head.as_ref() else { return Ok(None); };
876    // Walk forward from root → head, replaying each op's transition
877    // for `sig`; the last assignment wins.
878    let mut current: Option<lex_vcs::StageId> = None;
879    for record in log.walk_forward(head, None)? {
880        match &record.produces {
881            lex_vcs::StageTransition::Create { sig_id, stage_id }
882                if sig_id == sig => { current = Some(stage_id.clone()); }
883            lex_vcs::StageTransition::Replace { sig_id, to, .. }
884                if sig_id == sig => { current = Some(to.clone()); }
885            lex_vcs::StageTransition::Remove { sig_id, .. }
886                if sig_id == sig => { current = None; }
887            lex_vcs::StageTransition::Rename { from, to, body_stage_id }
888                if from == sig || to == sig => {
889                if from == sig { current = None; }
890                if to == sig   { current = Some(body_stage_id.clone()); }
891            }
892            lex_vcs::StageTransition::Merge { entries } => {
893                if let Some(opt) = entries.get(sig) {
894                    current = opt.clone();
895                }
896            }
897            _ => {}
898        }
899    }
900    Ok(current)
901}
902
903fn mint_merge_id() -> MergeSessionId {
904    use std::sync::atomic::{AtomicU64, Ordering};
905    static COUNTER: AtomicU64 = AtomicU64::new(0);
906    let nanos = SystemTime::now()
907        .duration_since(UNIX_EPOCH)
908        .map(|d| d.as_nanos())
909        .unwrap_or(0);
910    let n = COUNTER.fetch_add(1, Ordering::Relaxed);
911    format!("merge_{nanos:x}_{n:x}")
912}
913
914// ---- #242: append-only sync ---------------------------------------
915
916/// `POST /v1/ops/batch` (#242). Server endpoint for `lex op push`.
917///
918/// Body: a JSON array of `OperationRecord`s. The handler validates
919/// DAG integrity by checking that every op's `parents` either
920/// already exist on the remote *or* appear earlier in the same
921/// batch. This lets a client send a topologically-ordered slice
922/// without first probing for what's already there.
923///
924/// Response shape:
925///
926/// ```json
927/// { "received": N, "added": M, "skipped": (N-M), "added_ids": [...] }
928/// ```
929///
930/// Failure modes:
931///
932/// * `400` — body isn't a JSON array of op records.
933/// * `422` with `{ "error": "MissingParent", "detail": { "op_id":
934///   ..., "missing_parent": ... } }` if a parent is unreachable.
935///   The whole batch is rejected; nothing is persisted. The client
936///   should backfill the missing op and retry.
937/// * `409` if the supplied `op_id` doesn't match the canonical
938///   hash of the record's payload — content addressing must hold
939///   over the wire.
940///
941/// Idempotency: a record whose `op_id` already exists is silently
942/// skipped (not added, not rejected). Pushing the same payload
943/// twice is `received == N, added == 0` on the second call.
944pub(crate) fn ops_batch_handler(state: &State, body: &str)
945    -> Response<std::io::Cursor<Vec<u8>>>
946{
947    let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
948        Ok(r) => r,
949        Err(e) => return error_response(400,
950            format!("body must be a JSON array of OperationRecord: {e}")),
951    };
952    let store = state.store.lock().unwrap();
953    let log = match lex_vcs::OpLog::open(store.root()) {
954        Ok(l) => l,
955        Err(e) => return error_response(500, format!("opening op log: {e}")),
956    };
957
958    // Validate every record before persisting any of them.
959    //
960    // 1. Content-addressing: the supplied `op_id` must match the
961    //    canonical hash of `record.op`. Otherwise the client is
962    //    sending a forged or corrupted record.
963    // 2. DAG integrity: every parent must either already exist in
964    //    the local log OR appear earlier in this batch.
965    let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
966        std::collections::BTreeSet::new();
967    for rec in &records {
968        let expected = rec.op.op_id();
969        if expected != rec.op_id {
970            return error_with_detail(409, "OpIdMismatch", serde_json::json!({
971                "supplied": rec.op_id,
972                "expected": expected,
973            }));
974        }
975        for parent in &rec.op.parents {
976            let known = match log.get(parent) {
977                Ok(Some(_)) => true,
978                Ok(None) => false,
979                Err(e) => return error_response(500, format!("op log read: {e}")),
980            };
981            if !known && !batch_ids.contains(parent) {
982                return error_with_detail(422, "MissingParent", serde_json::json!({
983                    "op_id": rec.op_id,
984                    "missing_parent": parent,
985                }));
986            }
987        }
988        batch_ids.insert(rec.op_id.clone());
989    }
990
991    // Persist. `OpLog::put` is idempotent so a re-push is a no-op
992    // for already-present records.
993    let mut added = 0usize;
994    let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
995    for rec in &records {
996        let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
997        match log.put(rec) {
998            Ok(()) => {
999                if !already_present {
1000                    added += 1;
1001                    added_ids.push(&rec.op_id);
1002                }
1003            }
1004            Err(e) => return error_response(500, format!("op log write: {e}")),
1005        }
1006    }
1007
1008    json_response(200, &serde_json::json!({
1009        "received": records.len(),
1010        "added": added,
1011        "skipped": records.len() - added,
1012        "added_ids": added_ids,
1013    }))
1014}
1015
1016/// `POST /v1/attestations/batch` (#242). Server endpoint for `lex
1017/// attest push`.
1018///
1019/// Body: a JSON array of `Attestation`s. Validates that each
1020/// attestation's `op_id` (when set) refers to an op that already
1021/// exists on the remote — `attestation_id` is then re-derivable
1022/// from the canonical form, so cross-store dedup just works.
1023///
1024/// Response: same shape as `ops_batch_handler` but `added_ids` is
1025/// the list of accepted `attestation_id`s.
1026///
1027/// Failure modes:
1028///
1029/// * `400` for malformed JSON.
1030/// * `422` with `{ "error": "UnknownOp", "detail": { ... } }` if
1031///   an attestation's `op_id` references an op the remote doesn't
1032///   know about. Whole batch rejected.
1033/// * `409` `AttestationIdMismatch` if the supplied id doesn't
1034///   match the canonical hash.
1035///
1036/// Idempotency: same as the ops endpoint — content-addressed dedup.
1037pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1038    -> Response<std::io::Cursor<Vec<u8>>>
1039{
1040    let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1041        Ok(a) => a,
1042        Err(e) => return error_response(400,
1043            format!("body must be a JSON array of Attestation: {e}")),
1044    };
1045    let store = state.store.lock().unwrap();
1046    let log = match store.attestation_log() {
1047        Ok(l) => l,
1048        Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1049    };
1050    let op_log = match lex_vcs::OpLog::open(store.root()) {
1051        Ok(l) => l,
1052        Err(e) => return error_response(500, format!("opening op log: {e}")),
1053    };
1054
1055    // Validate before persisting any record.
1056    for att in &attestations {
1057        // Content-addressing: re-derive attestation_id from the
1058        // payload and reject mismatches.
1059        let expected = lex_vcs::Attestation::with_timestamp(
1060            att.stage_id.clone(),
1061            att.op_id.clone(),
1062            att.intent_id.clone(),
1063            att.kind.clone(),
1064            att.result.clone(),
1065            att.produced_by.clone(),
1066            att.cost.clone(),
1067            att.timestamp,
1068        ).attestation_id;
1069        if expected != att.attestation_id {
1070            return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1071                "supplied": att.attestation_id,
1072                "expected": expected,
1073            }));
1074        }
1075        // The op_id field, if set, must point at an op the remote
1076        // knows about. Without this check, attestations would
1077        // dangle into a future sync that never lands the op.
1078        if let Some(op_id) = &att.op_id {
1079            match op_log.get(op_id) {
1080                Ok(Some(_)) => {}
1081                Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1082                    "attestation_id": att.attestation_id,
1083                    "op_id": op_id,
1084                })),
1085                Err(e) => return error_response(500, format!("op log read: {e}")),
1086            }
1087        }
1088    }
1089
1090    // Persist. `AttestationLog::put` is idempotent on
1091    // `attestation_id` and the by-stage index is rewritten as a
1092    // marker file, also idempotent.
1093    let mut added = 0usize;
1094    let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1095    for att in &attestations {
1096        let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1097        match log.put(att) {
1098            Ok(()) => {
1099                if !already_present {
1100                    added += 1;
1101                    added_ids.push(&att.attestation_id);
1102                }
1103            }
1104            Err(e) => return error_response(500, format!("attestation log write: {e}")),
1105        }
1106    }
1107
1108    json_response(200, &serde_json::json!({
1109        "received": attestations.len(),
1110        "added": added,
1111        "skipped": attestations.len() - added,
1112        "added_ids": added_ids,
1113    }))
1114}
1115
1116/// `GET /v1/branches/<name>/head` (#242 follow-up). Probe endpoint
1117/// the `lex op push` client uses to discover the remote head before
1118/// computing a delta against `OpLog::ops_since`.
1119///
1120/// Response: `{ "branch": "main", "head_op": Option<OpId> }`.
1121/// Returns 200 even when the branch doesn't exist locally — the
1122/// answer in that case is `head_op: null`, which is the right
1123/// signal for "send everything you have."
1124pub(crate) fn branch_head_handler(state: &State, name: &str)
1125    -> Response<std::io::Cursor<Vec<u8>>>
1126{
1127    let store = state.store.lock().unwrap();
1128    let head = match store.get_branch(name) {
1129        Ok(Some(b)) => b.head_op,
1130        Ok(None) => None,
1131        Err(e) => return error_response(500, format!("get_branch: {e}")),
1132    };
1133    json_response(200, &serde_json::json!({
1134        "branch": name,
1135        "head_op": head,
1136    }))
1137}
1138
1139/// `GET /v1/ops/since?after=<op_id>&branch=<name>&limit=<n>` (#260).
1140/// Server endpoint for `lex op pull`.
1141///
1142/// Returns a JSON array of `OperationRecord`s reachable from
1143/// `branch.head_op` but not from `<after>`, sorted **oldest-first**
1144/// so the client can apply them in topological order without
1145/// re-sorting. Empty array when:
1146///
1147/// * The branch doesn't exist on the remote.
1148/// * The branch's `head_op` is `None`.
1149/// * `after == branch.head_op` (caller is already at the remote's head).
1150/// * `after` is *ahead of* the remote's head (caller is past the
1151///   remote — the symmetric "remote behind" case from #260).
1152///
1153/// `branch` defaults to `main`. `limit` caps the response — useful
1154/// for chunked pulls of large gaps; clients re-issue with the next
1155/// `after` once the prefix has landed.
1156///
1157/// Failure modes:
1158///
1159/// * `400` if the query string is malformed.
1160/// * `200` with `[]` for any of the empty-result cases above. "Caller
1161///   is already up to date" is a normal answer, not an error.
1162pub(crate) fn ops_since_handler(state: &State, query: &str)
1163    -> Response<std::io::Cursor<Vec<u8>>>
1164{
1165    let mut after: Option<String> = None;
1166    let mut branch = String::from("main");
1167    let mut limit: Option<usize> = None;
1168    for kv in query.split('&') {
1169        let Some((k, v)) = kv.split_once('=') else { continue };
1170        match k {
1171            "after" => after = Some(v.to_string()),
1172            "branch" => branch = v.to_string(),
1173            "limit" => {
1174                limit = Some(match v.parse::<usize>() {
1175                    Ok(n) => n,
1176                    Err(_) => return error_response(400,
1177                        format!("limit must be a positive integer, got `{v}`")),
1178                });
1179            }
1180            _ => {}
1181        }
1182    }
1183
1184    let store = state.store.lock().unwrap();
1185    let log = match lex_vcs::OpLog::open(store.root()) {
1186        Ok(l) => l,
1187        Err(e) => return error_response(500, format!("opening op log: {e}")),
1188    };
1189    let head = match store.get_branch(&branch) {
1190        Ok(Some(b)) => b.head_op,
1191        Ok(None) => None,
1192        Err(e) => return error_response(500, format!("get_branch: {e}")),
1193    };
1194    let Some(head) = head else {
1195        return json_response(200, &serde_json::json!([]));
1196    };
1197
1198    let ops_since = match log.ops_since(&head, after.as_ref()) {
1199        Ok(o) => o,
1200        Err(e) => return error_response(500, format!("ops_since: {e}")),
1201    };
1202    // ops_since walks newest-first; reverse so the client receives
1203    // oldest-first and can apply them in topological order with
1204    // `OpLog::put` straight through.
1205    let mut ops = ops_since;
1206    ops.reverse();
1207    if let Some(n) = limit {
1208        ops.truncate(n);
1209    }
1210
1211    json_response(200, &serde_json::to_value(&ops).unwrap_or_default())
1212}
1213
1214/// `GET /v1/attestations/since?after-op=<op_id>&limit=<n>` (#260).
1215/// Mirror of `ops_since_handler` for the attestation log.
1216///
1217/// Returns attestations whose `op_id` field is reachable from
1218/// **any** branch's head — not just one — and not in `after_op`'s
1219/// ancestry. The cross-branch fan-out matches the push side:
1220/// attestations are stage-keyed, not branch-keyed, so a single
1221/// "since this op" filter is the right shape.
1222///
1223/// Attestations with `op_id: None` (e.g. `Override`,
1224/// `ProducerBlock`) are always included — the cutoff doesn't apply.
1225/// `--limit` caps the response.
1226pub(crate) fn attestations_since_handler(state: &State, query: &str)
1227    -> Response<std::io::Cursor<Vec<u8>>>
1228{
1229    let mut after_op: Option<String> = None;
1230    let mut limit: Option<usize> = None;
1231    for kv in query.split('&') {
1232        let Some((k, v)) = kv.split_once('=') else { continue };
1233        match k {
1234            "after-op" => after_op = Some(v.to_string()),
1235            "limit" => {
1236                limit = Some(match v.parse::<usize>() {
1237                    Ok(n) => n,
1238                    Err(_) => return error_response(400,
1239                        format!("limit must be a positive integer, got `{v}`")),
1240                });
1241            }
1242            _ => {}
1243        }
1244    }
1245
1246    let store = state.store.lock().unwrap();
1247    let log = match store.attestation_log() {
1248        Ok(l) => l,
1249        Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1250    };
1251
1252    // Build the exclude set: every op_id reachable from `after_op`,
1253    // inclusive. Attestations whose op_id is in this set were
1254    // already known to the caller.
1255    let exclude: std::collections::BTreeSet<String> = match &after_op {
1256        None => std::collections::BTreeSet::new(),
1257        Some(cutoff) => {
1258            let op_log = match lex_vcs::OpLog::open(store.root()) {
1259                Ok(l) => l,
1260                Err(e) => return error_response(500, format!("opening op log: {e}")),
1261            };
1262            match op_log.walk_back(cutoff, None) {
1263                Ok(records) => records.into_iter().map(|r| r.op_id).collect(),
1264                Err(_) => {
1265                    // Cutoff op doesn't exist on this remote. Treat
1266                    // as "no exclude" — caller will get every
1267                    // attestation. They may dedup client-side.
1268                    std::collections::BTreeSet::new()
1269                }
1270            }
1271        }
1272    };
1273
1274    let all = match log.list_all() {
1275        Ok(v) => v,
1276        Err(e) => return error_response(500, format!("listing attestations: {e}")),
1277    };
1278    let mut filtered: Vec<lex_vcs::Attestation> = all
1279        .into_iter()
1280        .filter(|a| match &a.op_id {
1281            Some(op_id) => !exclude.contains(op_id),
1282            // No op_id = doesn't participate in the cutoff; always
1283            // ship it on the first pull, server-side idempotency
1284            // dedupes on the client.
1285            None => true,
1286        })
1287        .collect();
1288    // Stable order: oldest-first by `timestamp`, then by
1289    // `attestation_id` for ties. Lets the client land them
1290    // deterministically.
1291    filtered.sort_by(|a, b| {
1292        a.timestamp.cmp(&b.timestamp)
1293            .then_with(|| a.attestation_id.cmp(&b.attestation_id))
1294    });
1295    if let Some(n) = limit {
1296        filtered.truncate(n);
1297    }
1298
1299    json_response(200, &serde_json::to_value(&filtered).unwrap_or_default())
1300}
1301