1use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23 pub store: Mutex<Store>,
24 pub root: PathBuf,
29 pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38pub struct ApiMergeSession {
45 pub inner: MergeSession,
46 pub src_branch: String,
47 pub dst_branch: String,
48}
49
50impl State {
51 pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52 Ok(Self {
53 store: Mutex::new(Store::open(&root)?),
54 root,
55 sessions: Mutex::new(HashMap::new()),
56 })
57 }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62 error: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68 let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69 Response::from_data(bytes)
70 .with_status_code(status)
71 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75 json_response(status, &serde_json::to_value(ErrorEnvelope {
76 error: msg.into(), detail: None,
77 }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81 -> Response<std::io::Cursor<Vec<u8>>>
82{
83 json_response(status, &serde_json::to_value(ErrorEnvelope {
84 error: msg.into(), detail: Some(detail),
85 }).unwrap())
86}
87
88pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
89 let method = req.method().clone();
90 let url = req.url().to_string();
91 let path = url.split('?').next().unwrap_or("").to_string();
92 let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
93
94 let x_lex_user = req.headers().iter()
99 .find(|h| h.field.equiv("x-lex-user"))
100 .map(|h| h.value.as_str().to_string());
101
102 let mut body = String::new();
103 let _ = req.as_reader().read_to_string(&mut body);
104
105 let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
106 req.respond(resp)
107}
108
109fn route(
110 state: &State,
111 method: &Method,
112 path: &str,
113 query: &str,
114 body: &str,
115 x_lex_user: Option<&str>,
116) -> Response<std::io::Cursor<Vec<u8>>> {
117 match (method, path) {
118 (Method::Get, "/") => crate::web::activity_handler(state),
120 (Method::Get, "/web/branches") => crate::web::branches_handler(state),
121 (Method::Get, "/web/trust") => crate::web::trust_handler(state),
122 (Method::Get, "/web/attention") => crate::web::attention_handler(state),
123 (Method::Get, p) if p.starts_with("/web/branch/") => {
124 let name = &p["/web/branch/".len()..];
125 crate::web::branch_handler(state, name)
126 }
127 (Method::Get, p) if p.starts_with("/web/stage/") => {
128 let id = &p["/web/stage/".len()..];
129 crate::web::stage_html_handler(state, id)
130 }
131 (Method::Post, p) if p.starts_with("/web/stage/") && (
136 p.ends_with("/pin") || p.ends_with("/defer")
137 || p.ends_with("/block") || p.ends_with("/unblock")
138 ) => {
139 let prefix_len = "/web/stage/".len();
140 let last_slash = p.rfind('/').unwrap_or(p.len());
141 let id = &p[prefix_len..last_slash];
142 let verb = &p[last_slash + 1..];
143 let decision = match verb {
144 "pin" => crate::web::WebStageDecision::Pin,
145 "defer" => crate::web::WebStageDecision::Defer,
146 "block" => crate::web::WebStageDecision::Block,
147 "unblock" => crate::web::WebStageDecision::Unblock,
148 _ => unreachable!("matched in outer guard"),
149 };
150 crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
151 }
152 (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
154 (Method::Post, "/v1/parse") => parse_handler(body),
155 (Method::Post, "/v1/check") => check_handler(body),
156 (Method::Post, "/v1/publish") => publish_handler(state, body),
157 (Method::Post, "/v1/patch") => patch_handler(state, body),
158 (Method::Get, p) if p.starts_with("/v1/stage/") => {
159 let suffix = &p["/v1/stage/".len()..];
160 if let Some(id) = suffix.strip_suffix("/attestations") {
163 stage_attestations_handler(state, id)
164 } else {
165 stage_handler(state, suffix)
166 }
167 }
168 (Method::Post, "/v1/run") => run_handler(state, body, false),
169 (Method::Post, "/v1/replay") => run_handler(state, body, true),
170 (Method::Get, p) if p.starts_with("/v1/trace/") => {
171 let id = &p["/v1/trace/".len()..];
172 trace_handler(state, id)
173 }
174 (Method::Get, "/v1/diff") => diff_handler(state, query),
175 (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
176 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
177 let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
178 merge_resolve_handler(state, id, body)
179 }
180 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
181 let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
182 merge_commit_handler(state, id)
183 }
184 (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
186 (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
187 (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
191 let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
192 branch_head_handler(state, name)
193 }
194 _ => error_response(404, format!("unknown route: {method:?} {path}")),
195 }
196}
197
198#[derive(Deserialize)]
199struct ParseReq { source: String }
200
201fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
202 let req: ParseReq = match serde_json::from_str(body) {
203 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
204 };
205 match load_program_from_str(&req.source) {
206 Ok(prog) => {
207 let stages = canonicalize_program(&prog);
208 json_response(200, &serde_json::to_value(&stages).unwrap())
209 }
210 Err(e) => error_response(400, format!("syntax error: {e}")),
211 }
212}
213
214pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
215 let req: ParseReq = match serde_json::from_str(body) {
216 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
217 };
218 let prog = match load_program_from_str(&req.source) {
219 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
220 };
221 let stages = canonicalize_program(&prog);
222 match lex_types::check_program(&stages) {
223 Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
224 Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
225 }
226}
227
228#[derive(Deserialize)]
229struct PublishReq { source: String, #[serde(default)] activate: bool }
230
231pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
232 let req: PublishReq = match serde_json::from_str(body) {
233 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
234 };
235 let prog = match load_program_from_str(&req.source) {
236 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
237 };
238 let mut stages = canonicalize_program(&prog);
242 if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
243 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
244 }
245
246 let store = state.store.lock().unwrap();
247 let branch = store.current_branch();
248
249 let old_head = match store.branch_head(&branch) {
251 Ok(h) => h,
252 Err(e) => return error_response(500, format!("branch_head: {e}")),
253 };
254 let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
255 .filter_map(|stg| store.get_ast(stg).ok())
256 .filter_map(|s| match s {
257 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
258 _ => None,
259 })
260 .collect();
261 let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
262 .filter_map(|s| match s {
263 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
264 _ => None,
265 })
266 .collect();
267 let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
268
269 let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
271 {
272 let entry = new_imports.entry("<source>".into()).or_default();
273 for s in &stages {
274 if let lex_ast::Stage::Import(im) = s {
275 entry.insert(im.reference.clone());
276 }
277 }
278 }
279
280 match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
281 Ok(outcome) => json_response(200, &serde_json::json!({
282 "ops": outcome.ops,
283 "head_op": outcome.head_op,
284 })),
285 Err(lex_store::StoreError::TypeError(errs)) => {
293 error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
294 }
295 Err(e) => error_response(500, format!("publish_program: {e}")),
296 }
297}
298
299#[derive(Deserialize)]
300struct PatchReq {
301 stage_id: String,
302 patch: lex_ast::Patch,
303 #[serde(default)] activate: bool,
304}
305
306fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
309 let req: PatchReq = match serde_json::from_str(body) {
310 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
311 };
312 let store = state.store.lock().unwrap();
313
314 let original = match store.get_ast(&req.stage_id) {
316 Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
317 };
318
319 let patched = match lex_ast::apply_patch(&original, &req.patch) {
321 Ok(s) => s,
322 Err(e) => return error_with_detail(422, "patch failed",
323 serde_json::to_value(&e).unwrap_or_default()),
324 };
325
326 let stages = vec![patched.clone()];
328 if let Err(errs) = lex_types::check_program(&stages) {
329 return error_with_detail(422, "type errors after patch",
330 serde_json::to_value(&errs).unwrap_or_default());
331 }
332
333 let branch = store.current_branch();
337
338 let sig = match lex_ast::sig_id(&patched) {
340 Some(s) => s,
341 None => return error_response(500, "patched stage has no sig_id"),
342 };
343
344 let new_id = match store.publish(&patched) {
345 Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
346 };
347 if req.activate {
348 if let Err(e) = store.activate(&new_id) {
349 return error_response(500, format!("activate: {e}"));
350 }
351 }
352
353 let original_effects: std::collections::BTreeSet<String> = match &original {
355 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
356 _ => std::collections::BTreeSet::new(),
357 };
358 let patched_effects: std::collections::BTreeSet<String> = match &patched {
359 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
360 _ => std::collections::BTreeSet::new(),
361 };
362 let head_now = match store.get_branch(&branch) {
363 Ok(b) => b.and_then(|b| b.head_op),
364 Err(e) => return error_response(500, format!("get_branch: {e}")),
365 };
366 let kind = if original_effects != patched_effects {
367 let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
374 let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
375 lex_vcs::OperationKind::ChangeEffectSig {
376 sig_id: sig.clone(),
377 from_stage_id: req.stage_id.clone(),
378 to_stage_id: new_id.clone(),
379 from_effects: original_effects,
380 to_effects: patched_effects,
381 from_budget,
382 to_budget,
383 }
384 } else {
385 let budget = lex_vcs::operation_budget_from_effects(&original_effects);
386 lex_vcs::OperationKind::ModifyBody {
387 sig_id: sig.clone(),
388 from_stage_id: req.stage_id.clone(),
389 to_stage_id: new_id.clone(),
390 from_budget: budget,
391 to_budget: budget,
392 }
393 };
394 let transition = lex_vcs::StageTransition::Replace {
395 sig_id: sig.clone(),
396 from: req.stage_id.clone(),
397 to: new_id.clone(),
398 };
399 let op = lex_vcs::Operation::new(
400 kind,
401 head_now.into_iter().collect::<Vec<_>>(),
402 );
403 let op_id = match store.apply_operation(&branch, op, transition) {
404 Ok(id) => id,
405 Err(e) => return error_response(500, format!("apply_operation: {e}")),
406 };
407
408 let status = format!("{:?}",
409 store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
410 json_response(200, &serde_json::json!({
411 "old_stage_id": req.stage_id,
412 "new_stage_id": new_id,
413 "sig_id": sig,
414 "status": status,
415 "op_id": op_id,
416 }))
417}
418
419pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
420 let store = state.store.lock().unwrap();
421 let meta = match store.get_metadata(id) {
422 Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
423 };
424 let ast = match store.get_ast(id) {
425 Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
426 };
427 let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
428 json_response(200, &serde_json::json!({
429 "metadata": meta,
430 "ast": ast,
431 "status": status,
432 }))
433}
434
435pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
444 let store = state.store.lock().unwrap();
445 if let Err(e) = store.get_metadata(id) {
446 return error_response(404, format!("{e}"));
447 }
448 let log = match store.attestation_log() {
449 Ok(l) => l,
450 Err(e) => return error_response(500, format!("attestation log: {e}")),
451 };
452 let mut listing = match log.list_for_stage(&id.to_string()) {
453 Ok(v) => v,
454 Err(e) => return error_response(500, format!("list_for_stage: {e}")),
455 };
456 listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
457 json_response(200, &serde_json::json!({"attestations": listing}))
458}
459
460#[derive(Deserialize, Default)]
461struct PolicyJson {
462 #[serde(default)] allow_effects: Vec<String>,
463 #[serde(default)] allow_fs_read: Vec<String>,
464 #[serde(default)] allow_fs_write: Vec<String>,
465 #[serde(default)] budget: Option<u64>,
466}
467
468impl PolicyJson {
469 fn into_policy(self) -> Policy {
470 Policy {
471 allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
472 allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
473 allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
474 allow_net_host: Vec::new(),
475 allow_proc: Vec::new(),
476 budget: self.budget,
477 }
478 }
479}
480
481#[derive(Deserialize)]
482struct RunReq {
483 source: String,
484 #[serde(rename = "fn")] func: String,
485 #[serde(default)] args: Vec<serde_json::Value>,
486 #[serde(default)] policy: PolicyJson,
487 #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
488}
489
490pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
491 let req: RunReq = match serde_json::from_str(body) {
492 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
493 };
494 let prog = match load_program_from_str(&req.source) {
495 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
496 };
497 let stages = canonicalize_program(&prog);
498 if let Err(errs) = lex_types::check_program(&stages) {
499 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
500 }
501 let bc = compile_program(&stages);
502 let policy = req.policy.into_policy();
503 if let Err(violations) = check_policy(&bc, &policy) {
504 return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
505 }
506
507 let mut recorder = lex_trace::Recorder::new();
508 if with_overrides && !req.overrides.is_empty() {
509 recorder = recorder.with_overrides(req.overrides);
510 }
511 let handle = recorder.handle();
512 let handler = DefaultHandler::new(policy);
513 let mut vm = Vm::with_handler(&bc, Box::new(handler));
514 vm.set_tracer(Box::new(recorder));
515
516 let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
517 let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
518 let result = vm.call(&req.func, vargs);
519 let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
520
521 let store = state.store.lock().unwrap();
522 let (root_out, root_err, status) = match &result {
523 Ok(v) => (Some(value_to_json(v)), None, 200u16),
524 Err(e) => (None, Some(format!("{e}")), 200u16),
525 };
526 let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
527 root_out.clone(), root_err.clone(), started, ended);
528 let run_id = match store.save_trace(&tree) {
529 Ok(id) => id,
530 Err(e) => return error_response(500, format!("save_trace: {e}")),
531 };
532
533 let mut body = serde_json::json!({
534 "run_id": run_id,
535 "output": root_out,
536 });
537 if let Some(err) = root_err {
538 body["error"] = serde_json::Value::String(err);
539 }
540 json_response(status, &body)
541}
542
543fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
544 let store = state.store.lock().unwrap();
545 match store.load_trace(id) {
546 Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
547 Err(e) => error_response(404, format!("{e}")),
548 }
549}
550
551fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
552 let mut a = None;
553 let mut b = None;
554 for kv in query.split('&') {
555 if let Some((k, v)) = kv.split_once('=') {
556 match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
557 }
558 }
559 let (Some(a), Some(b)) = (a, b) else {
560 return error_response(400, "missing a or b query params");
561 };
562 let store = state.store.lock().unwrap();
563 let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
564 let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
565 match lex_trace::diff_runs(&ta, &tb) {
566 Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
567 None => json_response(200, &serde_json::json!({"divergence": null})),
568 }
569}
570
571fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
572
573fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
574
575#[derive(Deserialize)]
576struct MergeStartReq {
577 src_branch: String,
578 dst_branch: String,
579}
580
581fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
592 let req: MergeStartReq = match serde_json::from_str(body) {
593 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
594 };
595 let store = state.store.lock().unwrap();
596 let src_head = match store.get_branch(&req.src_branch) {
597 Ok(Some(b)) => b.head_op,
598 Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
599 Err(e) => return error_response(500, format!("src branch read: {e}")),
600 };
601 let dst_head = match store.get_branch(&req.dst_branch) {
602 Ok(Some(b)) => b.head_op,
603 Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
604 Err(e) => return error_response(500, format!("dst branch read: {e}")),
605 };
606 let log = match lex_vcs::OpLog::open(store.root()) {
607 Ok(l) => l,
608 Err(e) => return error_response(500, format!("op log: {e}")),
609 };
610 let merge_id = mint_merge_id();
614 let session = match MergeSession::start(
615 merge_id.clone(),
616 &log,
617 src_head.as_ref(),
618 dst_head.as_ref(),
619 ) {
620 Ok(s) => s,
621 Err(e) => return error_response(500, format!("merge start: {e}")),
622 };
623 let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
624 let auto_resolved_count = session.auto_resolved.len();
625 let body = serde_json::json!({
626 "merge_id": merge_id,
627 "src_head": session.src_head,
628 "dst_head": session.dst_head,
629 "lca": session.lca,
630 "conflicts": conflicts,
631 "auto_resolved_count": auto_resolved_count,
632 });
633 drop(conflicts);
634 drop(store);
635 let wrapped = ApiMergeSession {
636 inner: session,
637 src_branch: req.src_branch,
638 dst_branch: req.dst_branch,
639 };
640 state.sessions.lock().unwrap().insert(merge_id, wrapped);
641 json_response(200, &body)
642}
643
644#[derive(Deserialize)]
645struct MergeResolveReq {
646 resolutions: Vec<MergeResolveEntry>,
651}
652
653#[derive(Deserialize)]
654struct MergeResolveEntry {
655 conflict_id: String,
656 resolution: lex_vcs::Resolution,
657}
658
659fn merge_resolve_handler(
670 state: &State,
671 merge_id: &str,
672 body: &str,
673) -> Response<std::io::Cursor<Vec<u8>>> {
674 let req: MergeResolveReq = match serde_json::from_str(body) {
675 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
676 };
677 let mut sessions = state.sessions.lock().unwrap();
678 let Some(wrapped) = sessions.get_mut(merge_id) else {
679 return error_response(404, format!("unknown merge_id `{merge_id}`"));
680 };
681 let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
682 .map(|e| (e.conflict_id, e.resolution))
683 .collect();
684 let verdicts = wrapped.inner.resolve(pairs);
685 let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
686 let body = serde_json::json!({
687 "verdicts": verdicts,
688 "remaining_conflicts": remaining,
689 });
690 json_response(200, &body)
691}
692
693fn merge_commit_handler(
712 state: &State,
713 merge_id: &str,
714) -> Response<std::io::Cursor<Vec<u8>>> {
715 use std::collections::BTreeMap;
716 let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
717 Some(w) => w,
718 None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
719 };
720 let dst_branch = wrapped.dst_branch.clone();
721 let src_head = wrapped.inner.src_head.clone();
722 let dst_head = wrapped.inner.dst_head.clone();
723 let auto_resolved = wrapped.inner.auto_resolved.clone();
724
725 let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
728
729 for outcome in &auto_resolved {
731 if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
732 entries.insert(sig_id.clone(), stage_id.clone());
733 }
734 }
735
736 let resolved = match wrapped.inner.commit() {
738 Ok(r) => r,
739 Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
740 return error_with_detail(
744 422,
745 "conflicts remaining",
746 serde_json::json!({"unresolved": ids}),
747 );
748 }
749 };
750
751 for (conflict_id, resolution) in resolved {
752 match resolution {
753 lex_vcs::Resolution::TakeOurs => {
754 }
756 lex_vcs::Resolution::TakeTheirs => {
757 match resolve_take_theirs(state, &src_head, &conflict_id) {
767 Ok(stage_id) => {
768 entries.insert(conflict_id.clone(), stage_id);
769 }
770 Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
771 }
772 }
773 lex_vcs::Resolution::Custom { op } => {
774 match op.kind.merge_target() {
783 Some((sig, stage)) => {
784 if sig != conflict_id {
785 return error_with_detail(
786 422,
787 "custom op targets a different sig than the conflict",
788 serde_json::json!({
789 "conflict_id": conflict_id,
790 "op_targets": sig,
791 }),
792 );
793 }
794 entries.insert(conflict_id, stage);
795 }
796 None => {
797 return error_with_detail(
798 422,
799 "custom op kind doesn't yield a single sig→stage delta",
800 serde_json::json!({
801 "conflict_id": conflict_id,
802 "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
803 }),
804 );
805 }
806 }
807 }
808 lex_vcs::Resolution::Defer => {
809 return error_response(500, "internal: Defer slipped past commit gate");
811 }
812 }
813 }
814
815 let resolved_count = entries.len();
816 let mut parents: Vec<lex_vcs::OpId> = Vec::new();
817 if let Some(d) = dst_head { parents.push(d); }
818 if let Some(s) = src_head { parents.push(s); }
819 let op = lex_vcs::Operation::new(
820 lex_vcs::OperationKind::Merge { resolved: resolved_count },
821 parents,
822 );
823 let transition = lex_vcs::StageTransition::Merge { entries };
824 let store = state.store.lock().unwrap();
825 match store.apply_operation(&dst_branch, op, transition) {
826 Ok(new_head_op) => json_response(200, &serde_json::json!({
827 "new_head_op": new_head_op,
828 "dst_branch": dst_branch,
829 })),
830 Err(e) => error_response(500, format!("apply merge op: {e}")),
831 }
832}
833
834fn resolve_take_theirs(
839 state: &State,
840 src_head: &Option<lex_vcs::OpId>,
841 sig: &lex_vcs::SigId,
842) -> std::io::Result<Option<lex_vcs::StageId>> {
843 let store = state.store.lock().unwrap();
844 let log = lex_vcs::OpLog::open(store.root())?;
845 let Some(head) = src_head.as_ref() else { return Ok(None); };
846 let mut current: Option<lex_vcs::StageId> = None;
849 for record in log.walk_forward(head, None)? {
850 match &record.produces {
851 lex_vcs::StageTransition::Create { sig_id, stage_id }
852 if sig_id == sig => { current = Some(stage_id.clone()); }
853 lex_vcs::StageTransition::Replace { sig_id, to, .. }
854 if sig_id == sig => { current = Some(to.clone()); }
855 lex_vcs::StageTransition::Remove { sig_id, .. }
856 if sig_id == sig => { current = None; }
857 lex_vcs::StageTransition::Rename { from, to, body_stage_id }
858 if from == sig || to == sig => {
859 if from == sig { current = None; }
860 if to == sig { current = Some(body_stage_id.clone()); }
861 }
862 lex_vcs::StageTransition::Merge { entries } => {
863 if let Some(opt) = entries.get(sig) {
864 current = opt.clone();
865 }
866 }
867 _ => {}
868 }
869 }
870 Ok(current)
871}
872
873fn mint_merge_id() -> MergeSessionId {
874 use std::sync::atomic::{AtomicU64, Ordering};
875 static COUNTER: AtomicU64 = AtomicU64::new(0);
876 let nanos = SystemTime::now()
877 .duration_since(UNIX_EPOCH)
878 .map(|d| d.as_nanos())
879 .unwrap_or(0);
880 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
881 format!("merge_{nanos:x}_{n:x}")
882}
883
884pub(crate) fn ops_batch_handler(state: &State, body: &str)
915 -> Response<std::io::Cursor<Vec<u8>>>
916{
917 let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
918 Ok(r) => r,
919 Err(e) => return error_response(400,
920 format!("body must be a JSON array of OperationRecord: {e}")),
921 };
922 let store = state.store.lock().unwrap();
923 let log = match lex_vcs::OpLog::open(store.root()) {
924 Ok(l) => l,
925 Err(e) => return error_response(500, format!("opening op log: {e}")),
926 };
927
928 let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
936 std::collections::BTreeSet::new();
937 for rec in &records {
938 let expected = rec.op.op_id();
939 if expected != rec.op_id {
940 return error_with_detail(409, "OpIdMismatch", serde_json::json!({
941 "supplied": rec.op_id,
942 "expected": expected,
943 }));
944 }
945 for parent in &rec.op.parents {
946 let known = match log.get(parent) {
947 Ok(Some(_)) => true,
948 Ok(None) => false,
949 Err(e) => return error_response(500, format!("op log read: {e}")),
950 };
951 if !known && !batch_ids.contains(parent) {
952 return error_with_detail(422, "MissingParent", serde_json::json!({
953 "op_id": rec.op_id,
954 "missing_parent": parent,
955 }));
956 }
957 }
958 batch_ids.insert(rec.op_id.clone());
959 }
960
961 let mut added = 0usize;
964 let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
965 for rec in &records {
966 let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
967 match log.put(rec) {
968 Ok(()) => {
969 if !already_present {
970 added += 1;
971 added_ids.push(&rec.op_id);
972 }
973 }
974 Err(e) => return error_response(500, format!("op log write: {e}")),
975 }
976 }
977
978 json_response(200, &serde_json::json!({
979 "received": records.len(),
980 "added": added,
981 "skipped": records.len() - added,
982 "added_ids": added_ids,
983 }))
984}
985
986pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1008 -> Response<std::io::Cursor<Vec<u8>>>
1009{
1010 let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1011 Ok(a) => a,
1012 Err(e) => return error_response(400,
1013 format!("body must be a JSON array of Attestation: {e}")),
1014 };
1015 let store = state.store.lock().unwrap();
1016 let log = match store.attestation_log() {
1017 Ok(l) => l,
1018 Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1019 };
1020 let op_log = match lex_vcs::OpLog::open(store.root()) {
1021 Ok(l) => l,
1022 Err(e) => return error_response(500, format!("opening op log: {e}")),
1023 };
1024
1025 for att in &attestations {
1027 let expected = lex_vcs::Attestation::with_timestamp(
1030 att.stage_id.clone(),
1031 att.op_id.clone(),
1032 att.intent_id.clone(),
1033 att.kind.clone(),
1034 att.result.clone(),
1035 att.produced_by.clone(),
1036 att.cost.clone(),
1037 att.timestamp,
1038 ).attestation_id;
1039 if expected != att.attestation_id {
1040 return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1041 "supplied": att.attestation_id,
1042 "expected": expected,
1043 }));
1044 }
1045 if let Some(op_id) = &att.op_id {
1049 match op_log.get(op_id) {
1050 Ok(Some(_)) => {}
1051 Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1052 "attestation_id": att.attestation_id,
1053 "op_id": op_id,
1054 })),
1055 Err(e) => return error_response(500, format!("op log read: {e}")),
1056 }
1057 }
1058 }
1059
1060 let mut added = 0usize;
1064 let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1065 for att in &attestations {
1066 let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1067 match log.put(att) {
1068 Ok(()) => {
1069 if !already_present {
1070 added += 1;
1071 added_ids.push(&att.attestation_id);
1072 }
1073 }
1074 Err(e) => return error_response(500, format!("attestation log write: {e}")),
1075 }
1076 }
1077
1078 json_response(200, &serde_json::json!({
1079 "received": attestations.len(),
1080 "added": added,
1081 "skipped": attestations.len() - added,
1082 "added_ids": added_ids,
1083 }))
1084}
1085
1086pub(crate) fn branch_head_handler(state: &State, name: &str)
1095 -> Response<std::io::Cursor<Vec<u8>>>
1096{
1097 let store = state.store.lock().unwrap();
1098 let head = match store.get_branch(name) {
1099 Ok(Some(b)) => b.head_op,
1100 Ok(None) => None,
1101 Err(e) => return error_response(500, format!("get_branch: {e}")),
1102 };
1103 json_response(200, &serde_json::json!({
1104 "branch": name,
1105 "head_op": head,
1106 }))
1107}
1108