1use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23 pub store: Mutex<Store>,
24 pub root: PathBuf,
29 pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38pub struct ApiMergeSession {
45 pub inner: MergeSession,
46 pub src_branch: String,
47 pub dst_branch: String,
48}
49
50impl State {
51 pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52 Ok(Self {
53 store: Mutex::new(Store::open(&root)?),
54 root,
55 sessions: Mutex::new(HashMap::new()),
56 })
57 }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62 error: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68 let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69 Response::from_data(bytes)
70 .with_status_code(status)
71 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75 json_response(status, &serde_json::to_value(ErrorEnvelope {
76 error: msg.into(), detail: None,
77 }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81 -> Response<std::io::Cursor<Vec<u8>>>
82{
83 json_response(status, &serde_json::to_value(ErrorEnvelope {
84 error: msg.into(), detail: Some(detail),
85 }).unwrap())
86}
87
88fn write_error_response(prefix: &str, err: lex_store::StoreError)
94 -> Response<std::io::Cursor<Vec<u8>>>
95{
96 if let lex_store::StoreError::Contention { branch, attempts } = &err {
97 let body = serde_json::to_vec(&ErrorEnvelope {
98 error: format!("{prefix}: branch '{branch}' is contended (attempts={attempts})"),
99 detail: Some(serde_json::json!({
100 "kind": "contention",
101 "branch": branch,
102 "attempts": attempts,
103 })),
104 }).unwrap_or_else(|_| b"{}".to_vec());
105 return Response::from_data(body)
106 .with_status_code(503)
107 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
108 .with_header(Header::from_bytes(&b"Retry-After"[..], &b"1"[..]).unwrap());
109 }
110 if let lex_store::StoreError::BudgetExceeded { session_id, cap, spent_after } = &err {
118 let body = serde_json::to_vec(&ErrorEnvelope {
119 error: format!(
120 "{prefix}: session `{session_id}` budget exceeded \
121 (spent_after={spent_after}, cap={cap})"
122 ),
123 detail: Some(serde_json::json!({
124 "kind": "budget_exceeded",
125 "session_id": session_id,
126 "cap": cap,
127 "spent_after": spent_after,
128 })),
129 }).unwrap_or_else(|_| b"{}".to_vec());
130 return Response::from_data(body)
131 .with_status_code(503)
132 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
133 .with_header(Header::from_bytes(&b"Retry-After"[..], &b"0"[..]).unwrap());
134 }
135 error_response(500, format!("{prefix}: {err}"))
136}
137
138pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
139 let method = req.method().clone();
140 let url = req.url().to_string();
141 let path = url.split('?').next().unwrap_or("").to_string();
142 let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
143
144 let x_lex_user = req.headers().iter()
149 .find(|h| h.field.equiv("x-lex-user"))
150 .map(|h| h.value.as_str().to_string());
151
152 let mut body = String::new();
153 let _ = req.as_reader().read_to_string(&mut body);
154
155 let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
156 req.respond(resp)
157}
158
159fn route(
160 state: &State,
161 method: &Method,
162 path: &str,
163 query: &str,
164 body: &str,
165 x_lex_user: Option<&str>,
166) -> Response<std::io::Cursor<Vec<u8>>> {
167 match (method, path) {
168 (Method::Get, "/") => crate::web::activity_handler(state),
170 (Method::Get, "/web/branches") => crate::web::branches_handler(state),
171 (Method::Get, "/web/trust") => crate::web::trust_handler(state),
172 (Method::Get, "/web/attention") => crate::web::attention_handler(state),
173 (Method::Get, p) if p.starts_with("/web/branch/") => {
174 let name = &p["/web/branch/".len()..];
175 crate::web::branch_handler(state, name)
176 }
177 (Method::Get, p) if p.starts_with("/web/stage/") => {
178 let id = &p["/web/stage/".len()..];
179 crate::web::stage_html_handler(state, id)
180 }
181 (Method::Post, p) if p.starts_with("/web/stage/") && (
186 p.ends_with("/pin") || p.ends_with("/defer")
187 || p.ends_with("/block") || p.ends_with("/unblock")
188 ) => {
189 let prefix_len = "/web/stage/".len();
190 let last_slash = p.rfind('/').unwrap_or(p.len());
191 let id = &p[prefix_len..last_slash];
192 let verb = &p[last_slash + 1..];
193 let decision = match verb {
194 "pin" => crate::web::WebStageDecision::Pin,
195 "defer" => crate::web::WebStageDecision::Defer,
196 "block" => crate::web::WebStageDecision::Block,
197 "unblock" => crate::web::WebStageDecision::Unblock,
198 _ => unreachable!("matched in outer guard"),
199 };
200 crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
201 }
202 (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
204 (Method::Post, "/v1/parse") => parse_handler(body),
205 (Method::Post, "/v1/check") => check_handler(body),
206 (Method::Post, "/v1/publish") => publish_handler(state, body),
207 (Method::Post, "/v1/patch") => patch_handler(state, body),
208 (Method::Get, p) if p.starts_with("/v1/stage/") => {
209 let suffix = &p["/v1/stage/".len()..];
210 if let Some(id) = suffix.strip_suffix("/attestations") {
213 stage_attestations_handler(state, id)
214 } else {
215 stage_handler(state, suffix)
216 }
217 }
218 (Method::Post, "/v1/run") => run_handler(state, body, false),
219 (Method::Post, "/v1/replay") => run_handler(state, body, true),
220 (Method::Get, p) if p.starts_with("/v1/trace/") => {
221 let id = &p["/v1/trace/".len()..];
222 trace_handler(state, id)
223 }
224 (Method::Get, "/v1/diff") => diff_handler(state, query),
225 (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
226 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
227 let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
228 merge_resolve_handler(state, id, body)
229 }
230 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
231 let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
232 merge_commit_handler(state, id)
233 }
234 (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
236 (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
237 (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
241 let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
242 branch_head_handler(state, name)
243 }
244 (Method::Get, "/v1/ops/since") => ops_since_handler(state, query),
248 (Method::Get, "/v1/attestations/since") => attestations_since_handler(state, query),
249 _ => error_response(404, format!("unknown route: {method:?} {path}")),
250 }
251}
252
253#[derive(Deserialize)]
254struct ParseReq { source: String }
255
256fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
257 let req: ParseReq = match serde_json::from_str(body) {
258 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
259 };
260 match load_program_from_str(&req.source) {
261 Ok(prog) => {
262 let stages = canonicalize_program(&prog);
263 json_response(200, &serde_json::to_value(&stages).unwrap())
264 }
265 Err(e) => error_response(400, format!("syntax error: {e}")),
266 }
267}
268
269pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
270 let req: ParseReq = match serde_json::from_str(body) {
271 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
272 };
273 let prog = match load_program_from_str(&req.source) {
274 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
275 };
276 let stages = canonicalize_program(&prog);
277 match lex_types::check_program(&stages) {
278 Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
279 Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
280 }
281}
282
283#[derive(Deserialize)]
284struct PublishReq { source: String, #[serde(default)] activate: bool }
285
286pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
287 let req: PublishReq = match serde_json::from_str(body) {
288 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
289 };
290 let prog = match load_program_from_str(&req.source) {
291 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
292 };
293 let mut stages = canonicalize_program(&prog);
297 if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
298 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
299 }
300
301 let store = state.store.lock().unwrap();
302 let branch = store.current_branch();
303
304 let old_head = match store.branch_head(&branch) {
306 Ok(h) => h,
307 Err(e) => return error_response(500, format!("branch_head: {e}")),
308 };
309 let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
310 .filter_map(|stg| store.get_ast(stg).ok())
311 .filter_map(|s| match s {
312 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
313 _ => None,
314 })
315 .collect();
316 let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
317 .filter_map(|s| match s {
318 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
319 _ => None,
320 })
321 .collect();
322 let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
323
324 let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
326 {
327 let entry = new_imports.entry("<source>".into()).or_default();
328 for s in &stages {
329 if let lex_ast::Stage::Import(im) = s {
330 entry.insert(im.reference.clone());
331 }
332 }
333 }
334
335 match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
336 Ok(outcome) => json_response(200, &serde_json::json!({
337 "ops": outcome.ops,
338 "head_op": outcome.head_op,
339 })),
340 Err(lex_store::StoreError::TypeError(errs)) => {
348 error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
349 }
350 Err(e) => write_error_response("publish_program", e),
351 }
352}
353
354#[derive(Deserialize)]
355struct PatchReq {
356 stage_id: String,
357 patch: lex_ast::Patch,
358 #[serde(default)] activate: bool,
359}
360
361fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
364 let req: PatchReq = match serde_json::from_str(body) {
365 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
366 };
367 let store = state.store.lock().unwrap();
368
369 let original = match store.get_ast(&req.stage_id) {
371 Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
372 };
373
374 let patched = match lex_ast::apply_patch(&original, &req.patch) {
376 Ok(s) => s,
377 Err(e) => return error_with_detail(422, "patch failed",
378 serde_json::to_value(&e).unwrap_or_default()),
379 };
380
381 let stages = vec![patched.clone()];
383 if let Err(errs) = lex_types::check_program(&stages) {
384 return error_with_detail(422, "type errors after patch",
385 serde_json::to_value(&errs).unwrap_or_default());
386 }
387
388 let branch = store.current_branch();
392
393 let sig = match lex_ast::sig_id(&patched) {
395 Some(s) => s,
396 None => return error_response(500, "patched stage has no sig_id"),
397 };
398
399 let new_id = match store.publish(&patched) {
400 Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
401 };
402 if req.activate {
403 if let Err(e) = store.activate(&new_id) {
404 return error_response(500, format!("activate: {e}"));
405 }
406 }
407
408 let original_effects: std::collections::BTreeSet<String> = match &original {
410 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
411 _ => std::collections::BTreeSet::new(),
412 };
413 let patched_effects: std::collections::BTreeSet<String> = match &patched {
414 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
415 _ => std::collections::BTreeSet::new(),
416 };
417 let head_now = match store.get_branch(&branch) {
418 Ok(b) => b.and_then(|b| b.head_op),
419 Err(e) => return error_response(500, format!("get_branch: {e}")),
420 };
421 let kind = if original_effects != patched_effects {
422 let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
429 let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
430 lex_vcs::OperationKind::ChangeEffectSig {
431 sig_id: sig.clone(),
432 from_stage_id: req.stage_id.clone(),
433 to_stage_id: new_id.clone(),
434 from_effects: original_effects,
435 to_effects: patched_effects,
436 from_budget,
437 to_budget,
438 }
439 } else {
440 let budget = lex_vcs::operation_budget_from_effects(&original_effects);
441 lex_vcs::OperationKind::ModifyBody {
442 sig_id: sig.clone(),
443 from_stage_id: req.stage_id.clone(),
444 to_stage_id: new_id.clone(),
445 from_budget: budget,
446 to_budget: budget,
447 }
448 };
449 let transition = lex_vcs::StageTransition::Replace {
450 sig_id: sig.clone(),
451 from: req.stage_id.clone(),
452 to: new_id.clone(),
453 };
454 let op = lex_vcs::Operation::new(
455 kind,
456 head_now.into_iter().collect::<Vec<_>>(),
457 );
458 let op_id = match store.apply_operation(&branch, op, transition) {
459 Ok(id) => id,
460 Err(e) => return write_error_response("apply_operation", e),
461 };
462
463 let status = format!("{:?}",
464 store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
465 json_response(200, &serde_json::json!({
466 "old_stage_id": req.stage_id,
467 "new_stage_id": new_id,
468 "sig_id": sig,
469 "status": status,
470 "op_id": op_id,
471 }))
472}
473
474pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
475 let store = state.store.lock().unwrap();
476 let meta = match store.get_metadata(id) {
477 Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
478 };
479 let ast = match store.get_ast(id) {
480 Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
481 };
482 let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
483 json_response(200, &serde_json::json!({
484 "metadata": meta,
485 "ast": ast,
486 "status": status,
487 }))
488}
489
490pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
499 let store = state.store.lock().unwrap();
500 if let Err(e) = store.get_metadata(id) {
501 return error_response(404, format!("{e}"));
502 }
503 let log = match store.attestation_log() {
504 Ok(l) => l,
505 Err(e) => return error_response(500, format!("attestation log: {e}")),
506 };
507 let mut listing = match log.list_for_stage(&id.to_string()) {
508 Ok(v) => v,
509 Err(e) => return error_response(500, format!("list_for_stage: {e}")),
510 };
511 listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
512 json_response(200, &serde_json::json!({"attestations": listing}))
513}
514
515#[derive(Deserialize, Default)]
516struct PolicyJson {
517 #[serde(default)] allow_effects: Vec<String>,
518 #[serde(default)] allow_fs_read: Vec<String>,
519 #[serde(default)] allow_fs_write: Vec<String>,
520 #[serde(default)] budget: Option<u64>,
521}
522
523impl PolicyJson {
524 fn into_policy(self) -> Policy {
525 Policy {
526 allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
527 allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
528 allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
529 allow_net_host: Vec::new(),
530 allow_proc: Vec::new(),
531 budget: self.budget,
532 }
533 }
534}
535
536#[derive(Deserialize)]
537struct RunReq {
538 source: String,
539 #[serde(rename = "fn")] func: String,
540 #[serde(default)] args: Vec<serde_json::Value>,
541 #[serde(default)] policy: PolicyJson,
542 #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
543}
544
545pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
546 let req: RunReq = match serde_json::from_str(body) {
547 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
548 };
549 let prog = match load_program_from_str(&req.source) {
550 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
551 };
552 let stages = canonicalize_program(&prog);
553 if let Err(errs) = lex_types::check_program(&stages) {
554 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
555 }
556 let bc = compile_program(&stages);
557 let policy = req.policy.into_policy();
558 if let Err(violations) = check_policy(&bc, &policy) {
559 return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
560 }
561
562 let mut recorder = lex_trace::Recorder::new();
563 if with_overrides && !req.overrides.is_empty() {
564 recorder = recorder.with_overrides(req.overrides);
565 }
566 let handle = recorder.handle();
567 let handler = DefaultHandler::new(policy);
568 let mut vm = Vm::with_handler(&bc, Box::new(handler));
569 vm.set_tracer(Box::new(recorder));
570
571 let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
572 let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
573 let result = vm.call(&req.func, vargs);
574 let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
575
576 let store = state.store.lock().unwrap();
577 let (root_out, root_err, status) = match &result {
578 Ok(v) => (Some(value_to_json(v)), None, 200u16),
579 Err(e) => (None, Some(format!("{e}")), 200u16),
580 };
581 let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
582 root_out.clone(), root_err.clone(), started, ended);
583 let run_id = match store.save_trace(&tree) {
584 Ok(id) => id,
585 Err(e) => return error_response(500, format!("save_trace: {e}")),
586 };
587
588 let mut body = serde_json::json!({
589 "run_id": run_id,
590 "output": root_out,
591 });
592 if let Some(err) = root_err {
593 body["error"] = serde_json::Value::String(err);
594 }
595 json_response(status, &body)
596}
597
598fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
599 let store = state.store.lock().unwrap();
600 match store.load_trace(id) {
601 Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
602 Err(e) => error_response(404, format!("{e}")),
603 }
604}
605
606fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
607 let mut a = None;
608 let mut b = None;
609 for kv in query.split('&') {
610 if let Some((k, v)) = kv.split_once('=') {
611 match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
612 }
613 }
614 let (Some(a), Some(b)) = (a, b) else {
615 return error_response(400, "missing a or b query params");
616 };
617 let store = state.store.lock().unwrap();
618 let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
619 let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
620 match lex_trace::diff_runs(&ta, &tb) {
621 Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
622 None => json_response(200, &serde_json::json!({"divergence": null})),
623 }
624}
625
626fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
627
628fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
629
630#[derive(Deserialize)]
631struct MergeStartReq {
632 src_branch: String,
633 dst_branch: String,
634}
635
636fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
647 let req: MergeStartReq = match serde_json::from_str(body) {
648 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
649 };
650 let store = state.store.lock().unwrap();
651 let src_head = match store.get_branch(&req.src_branch) {
652 Ok(Some(b)) => b.head_op,
653 Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
654 Err(e) => return error_response(500, format!("src branch read: {e}")),
655 };
656 let dst_head = match store.get_branch(&req.dst_branch) {
657 Ok(Some(b)) => b.head_op,
658 Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
659 Err(e) => return error_response(500, format!("dst branch read: {e}")),
660 };
661 let log = match lex_vcs::OpLog::open(store.root()) {
662 Ok(l) => l,
663 Err(e) => return error_response(500, format!("op log: {e}")),
664 };
665 let merge_id = mint_merge_id();
669 let session = match MergeSession::start(
670 merge_id.clone(),
671 &log,
672 src_head.as_ref(),
673 dst_head.as_ref(),
674 ) {
675 Ok(s) => s,
676 Err(e) => return error_response(500, format!("merge start: {e}")),
677 };
678 let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
679 let auto_resolved_count = session.auto_resolved.len();
680 let body = serde_json::json!({
681 "merge_id": merge_id,
682 "src_head": session.src_head,
683 "dst_head": session.dst_head,
684 "lca": session.lca,
685 "conflicts": conflicts,
686 "auto_resolved_count": auto_resolved_count,
687 });
688 drop(conflicts);
689 drop(store);
690 let wrapped = ApiMergeSession {
691 inner: session,
692 src_branch: req.src_branch,
693 dst_branch: req.dst_branch,
694 };
695 state.sessions.lock().unwrap().insert(merge_id, wrapped);
696 json_response(200, &body)
697}
698
699#[derive(Deserialize)]
700struct MergeResolveReq {
701 resolutions: Vec<MergeResolveEntry>,
706}
707
708#[derive(Deserialize)]
709struct MergeResolveEntry {
710 conflict_id: String,
711 resolution: lex_vcs::Resolution,
712}
713
714fn merge_resolve_handler(
725 state: &State,
726 merge_id: &str,
727 body: &str,
728) -> Response<std::io::Cursor<Vec<u8>>> {
729 let req: MergeResolveReq = match serde_json::from_str(body) {
730 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
731 };
732 let mut sessions = state.sessions.lock().unwrap();
733 let Some(wrapped) = sessions.get_mut(merge_id) else {
734 return error_response(404, format!("unknown merge_id `{merge_id}`"));
735 };
736 let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
737 .map(|e| (e.conflict_id, e.resolution))
738 .collect();
739 let verdicts = wrapped.inner.resolve(pairs);
740 let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
741 let body = serde_json::json!({
742 "verdicts": verdicts,
743 "remaining_conflicts": remaining,
744 });
745 json_response(200, &body)
746}
747
748fn merge_commit_handler(
767 state: &State,
768 merge_id: &str,
769) -> Response<std::io::Cursor<Vec<u8>>> {
770 use std::collections::BTreeMap;
771 let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
772 Some(w) => w,
773 None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
774 };
775 let dst_branch = wrapped.dst_branch.clone();
776 let src_head = wrapped.inner.src_head.clone();
777 let dst_head = wrapped.inner.dst_head.clone();
778 let auto_resolved = wrapped.inner.auto_resolved.clone();
779
780 let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
783
784 for outcome in &auto_resolved {
786 if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
787 entries.insert(sig_id.clone(), stage_id.clone());
788 }
789 }
790
791 let resolved = match wrapped.inner.commit() {
793 Ok(r) => r,
794 Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
795 return error_with_detail(
799 422,
800 "conflicts remaining",
801 serde_json::json!({"unresolved": ids}),
802 );
803 }
804 };
805
806 for (conflict_id, resolution) in resolved {
807 match resolution {
808 lex_vcs::Resolution::TakeOurs => {
809 }
811 lex_vcs::Resolution::TakeTheirs => {
812 match resolve_take_theirs(state, &src_head, &conflict_id) {
822 Ok(stage_id) => {
823 entries.insert(conflict_id.clone(), stage_id);
824 }
825 Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
826 }
827 }
828 lex_vcs::Resolution::Custom { op } => {
829 match op.kind.merge_target() {
838 Some((sig, stage)) => {
839 if sig != conflict_id {
840 return error_with_detail(
841 422,
842 "custom op targets a different sig than the conflict",
843 serde_json::json!({
844 "conflict_id": conflict_id,
845 "op_targets": sig,
846 }),
847 );
848 }
849 entries.insert(conflict_id, stage);
850 }
851 None => {
852 return error_with_detail(
853 422,
854 "custom op kind doesn't yield a single sig→stage delta",
855 serde_json::json!({
856 "conflict_id": conflict_id,
857 "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
858 }),
859 );
860 }
861 }
862 }
863 lex_vcs::Resolution::Defer => {
864 return error_response(500, "internal: Defer slipped past commit gate");
866 }
867 }
868 }
869
870 let resolved_count = entries.len();
871 let mut parents: Vec<lex_vcs::OpId> = Vec::new();
872 if let Some(d) = dst_head { parents.push(d); }
873 if let Some(s) = src_head { parents.push(s); }
874 let op = lex_vcs::Operation::new(
875 lex_vcs::OperationKind::Merge { resolved: resolved_count },
876 parents,
877 );
878 let transition = lex_vcs::StageTransition::Merge { entries };
879 let store = state.store.lock().unwrap();
880 match store.apply_operation(&dst_branch, op, transition) {
881 Ok(new_head_op) => json_response(200, &serde_json::json!({
882 "new_head_op": new_head_op,
883 "dst_branch": dst_branch,
884 })),
885 Err(e) => write_error_response("apply merge op", e),
886 }
887}
888
889fn resolve_take_theirs(
894 state: &State,
895 src_head: &Option<lex_vcs::OpId>,
896 sig: &lex_vcs::SigId,
897) -> std::io::Result<Option<lex_vcs::StageId>> {
898 let store = state.store.lock().unwrap();
899 let log = lex_vcs::OpLog::open(store.root())?;
900 let Some(head) = src_head.as_ref() else { return Ok(None); };
901 let mut current: Option<lex_vcs::StageId> = None;
904 for record in log.walk_forward(head, None)? {
905 match &record.produces {
906 lex_vcs::StageTransition::Create { sig_id, stage_id }
907 if sig_id == sig => { current = Some(stage_id.clone()); }
908 lex_vcs::StageTransition::Replace { sig_id, to, .. }
909 if sig_id == sig => { current = Some(to.clone()); }
910 lex_vcs::StageTransition::Remove { sig_id, .. }
911 if sig_id == sig => { current = None; }
912 lex_vcs::StageTransition::Rename { from, to, body_stage_id }
913 if from == sig || to == sig => {
914 if from == sig { current = None; }
915 if to == sig { current = Some(body_stage_id.clone()); }
916 }
917 lex_vcs::StageTransition::Merge { entries } => {
918 if let Some(opt) = entries.get(sig) {
919 current = opt.clone();
920 }
921 }
922 _ => {}
923 }
924 }
925 Ok(current)
926}
927
928fn mint_merge_id() -> MergeSessionId {
929 use std::sync::atomic::{AtomicU64, Ordering};
930 static COUNTER: AtomicU64 = AtomicU64::new(0);
931 let nanos = SystemTime::now()
932 .duration_since(UNIX_EPOCH)
933 .map(|d| d.as_nanos())
934 .unwrap_or(0);
935 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
936 format!("merge_{nanos:x}_{n:x}")
937}
938
939pub(crate) fn ops_batch_handler(state: &State, body: &str)
970 -> Response<std::io::Cursor<Vec<u8>>>
971{
972 let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
973 Ok(r) => r,
974 Err(e) => return error_response(400,
975 format!("body must be a JSON array of OperationRecord: {e}")),
976 };
977 let store = state.store.lock().unwrap();
978 let log = match lex_vcs::OpLog::open(store.root()) {
979 Ok(l) => l,
980 Err(e) => return error_response(500, format!("opening op log: {e}")),
981 };
982
983 let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
991 std::collections::BTreeSet::new();
992 for rec in &records {
993 let expected = rec.op.op_id();
994 if expected != rec.op_id {
995 return error_with_detail(409, "OpIdMismatch", serde_json::json!({
996 "supplied": rec.op_id,
997 "expected": expected,
998 }));
999 }
1000 for parent in &rec.op.parents {
1001 let known = match log.get(parent) {
1002 Ok(Some(_)) => true,
1003 Ok(None) => false,
1004 Err(e) => return error_response(500, format!("op log read: {e}")),
1005 };
1006 if !known && !batch_ids.contains(parent) {
1007 return error_with_detail(422, "MissingParent", serde_json::json!({
1008 "op_id": rec.op_id,
1009 "missing_parent": parent,
1010 }));
1011 }
1012 }
1013 batch_ids.insert(rec.op_id.clone());
1014 }
1015
1016 let mut added = 0usize;
1019 let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
1020 for rec in &records {
1021 let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
1022 match log.put(rec) {
1023 Ok(()) => {
1024 if !already_present {
1025 added += 1;
1026 added_ids.push(&rec.op_id);
1027 }
1028 }
1029 Err(e) => return error_response(500, format!("op log write: {e}")),
1030 }
1031 }
1032
1033 json_response(200, &serde_json::json!({
1034 "received": records.len(),
1035 "added": added,
1036 "skipped": records.len() - added,
1037 "added_ids": added_ids,
1038 }))
1039}
1040
1041pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1063 -> Response<std::io::Cursor<Vec<u8>>>
1064{
1065 let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1066 Ok(a) => a,
1067 Err(e) => return error_response(400,
1068 format!("body must be a JSON array of Attestation: {e}")),
1069 };
1070 let store = state.store.lock().unwrap();
1071 let log = match store.attestation_log() {
1072 Ok(l) => l,
1073 Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1074 };
1075 let op_log = match lex_vcs::OpLog::open(store.root()) {
1076 Ok(l) => l,
1077 Err(e) => return error_response(500, format!("opening op log: {e}")),
1078 };
1079
1080 for att in &attestations {
1082 let expected = lex_vcs::Attestation::with_timestamp(
1085 att.stage_id.clone(),
1086 att.op_id.clone(),
1087 att.intent_id.clone(),
1088 att.kind.clone(),
1089 att.result.clone(),
1090 att.produced_by.clone(),
1091 att.cost.clone(),
1092 att.timestamp,
1093 ).attestation_id;
1094 if expected != att.attestation_id {
1095 return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1096 "supplied": att.attestation_id,
1097 "expected": expected,
1098 }));
1099 }
1100 if let Some(op_id) = &att.op_id {
1104 match op_log.get(op_id) {
1105 Ok(Some(_)) => {}
1106 Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1107 "attestation_id": att.attestation_id,
1108 "op_id": op_id,
1109 })),
1110 Err(e) => return error_response(500, format!("op log read: {e}")),
1111 }
1112 }
1113 }
1114
1115 let mut added = 0usize;
1119 let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1120 for att in &attestations {
1121 let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1122 match log.put(att) {
1123 Ok(()) => {
1124 if !already_present {
1125 added += 1;
1126 added_ids.push(&att.attestation_id);
1127 }
1128 }
1129 Err(e) => return error_response(500, format!("attestation log write: {e}")),
1130 }
1131 }
1132
1133 json_response(200, &serde_json::json!({
1134 "received": attestations.len(),
1135 "added": added,
1136 "skipped": attestations.len() - added,
1137 "added_ids": added_ids,
1138 }))
1139}
1140
1141pub(crate) fn branch_head_handler(state: &State, name: &str)
1150 -> Response<std::io::Cursor<Vec<u8>>>
1151{
1152 let store = state.store.lock().unwrap();
1153 let head = match store.get_branch(name) {
1154 Ok(Some(b)) => b.head_op,
1155 Ok(None) => None,
1156 Err(e) => return error_response(500, format!("get_branch: {e}")),
1157 };
1158 json_response(200, &serde_json::json!({
1159 "branch": name,
1160 "head_op": head,
1161 }))
1162}
1163
1164pub(crate) fn ops_since_handler(state: &State, query: &str)
1188 -> Response<std::io::Cursor<Vec<u8>>>
1189{
1190 let mut after: Option<String> = None;
1191 let mut branch = String::from("main");
1192 let mut limit: Option<usize> = None;
1193 for kv in query.split('&') {
1194 let Some((k, v)) = kv.split_once('=') else { continue };
1195 match k {
1196 "after" => after = Some(v.to_string()),
1197 "branch" => branch = v.to_string(),
1198 "limit" => {
1199 limit = Some(match v.parse::<usize>() {
1200 Ok(n) => n,
1201 Err(_) => return error_response(400,
1202 format!("limit must be a positive integer, got `{v}`")),
1203 });
1204 }
1205 _ => {}
1206 }
1207 }
1208
1209 let store = state.store.lock().unwrap();
1210 let log = match lex_vcs::OpLog::open(store.root()) {
1211 Ok(l) => l,
1212 Err(e) => return error_response(500, format!("opening op log: {e}")),
1213 };
1214 let head = match store.get_branch(&branch) {
1215 Ok(Some(b)) => b.head_op,
1216 Ok(None) => None,
1217 Err(e) => return error_response(500, format!("get_branch: {e}")),
1218 };
1219 let Some(head) = head else {
1220 return json_response(200, &serde_json::json!([]));
1221 };
1222
1223 let ops_since = match log.ops_since(&head, after.as_ref()) {
1224 Ok(o) => o,
1225 Err(e) => return error_response(500, format!("ops_since: {e}")),
1226 };
1227 let mut ops = ops_since;
1231 ops.reverse();
1232 if let Some(n) = limit {
1233 ops.truncate(n);
1234 }
1235
1236 json_response(200, &serde_json::to_value(&ops).unwrap_or_default())
1237}
1238
1239pub(crate) fn attestations_since_handler(state: &State, query: &str)
1252 -> Response<std::io::Cursor<Vec<u8>>>
1253{
1254 let mut after_op: Option<String> = None;
1255 let mut limit: Option<usize> = None;
1256 for kv in query.split('&') {
1257 let Some((k, v)) = kv.split_once('=') else { continue };
1258 match k {
1259 "after-op" => after_op = Some(v.to_string()),
1260 "limit" => {
1261 limit = Some(match v.parse::<usize>() {
1262 Ok(n) => n,
1263 Err(_) => return error_response(400,
1264 format!("limit must be a positive integer, got `{v}`")),
1265 });
1266 }
1267 _ => {}
1268 }
1269 }
1270
1271 let store = state.store.lock().unwrap();
1272 let log = match store.attestation_log() {
1273 Ok(l) => l,
1274 Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1275 };
1276
1277 let exclude: std::collections::BTreeSet<String> = match &after_op {
1281 None => std::collections::BTreeSet::new(),
1282 Some(cutoff) => {
1283 let op_log = match lex_vcs::OpLog::open(store.root()) {
1284 Ok(l) => l,
1285 Err(e) => return error_response(500, format!("opening op log: {e}")),
1286 };
1287 match op_log.walk_back(cutoff, None) {
1288 Ok(records) => records.into_iter().map(|r| r.op_id).collect(),
1289 Err(_) => {
1290 std::collections::BTreeSet::new()
1294 }
1295 }
1296 }
1297 };
1298
1299 let all = match log.list_all() {
1300 Ok(v) => v,
1301 Err(e) => return error_response(500, format!("listing attestations: {e}")),
1302 };
1303 let mut filtered: Vec<lex_vcs::Attestation> = all
1304 .into_iter()
1305 .filter(|a| match &a.op_id {
1306 Some(op_id) => !exclude.contains(op_id),
1307 None => true,
1311 })
1312 .collect();
1313 filtered.sort_by(|a, b| {
1317 a.timestamp.cmp(&b.timestamp)
1318 .then_with(|| a.attestation_id.cmp(&b.attestation_id))
1319 });
1320 if let Some(n) = limit {
1321 filtered.truncate(n);
1322 }
1323
1324 json_response(200, &serde_json::to_value(&filtered).unwrap_or_default())
1325}
1326