1use indexmap::IndexMap;
9use lex_ast::canonicalize_program;
10use lex_bytecode::{compile_program, vm::Vm, Value};
11use lex_runtime::{check_program as check_policy, DefaultHandler, Policy};
12use lex_store::Store;
13use lex_syntax::load_program_from_str;
14use lex_vcs::{MergeSession, MergeSessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeSet, HashMap};
17use std::path::PathBuf;
18use std::sync::{Arc, Mutex};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tiny_http::{Header, Method, Request, Response};
21
22pub struct State {
23 pub store: Mutex<Store>,
24 pub root: PathBuf,
29 pub sessions: Mutex<HashMap<MergeSessionId, ApiMergeSession>>,
36}
37
38pub struct ApiMergeSession {
45 pub inner: MergeSession,
46 pub src_branch: String,
47 pub dst_branch: String,
48}
49
50impl State {
51 pub fn open(root: PathBuf) -> anyhow::Result<Self> {
52 Ok(Self {
53 store: Mutex::new(Store::open(&root)?),
54 root,
55 sessions: Mutex::new(HashMap::new()),
56 })
57 }
58}
59
60#[derive(Debug, Serialize, Deserialize)]
61struct ErrorEnvelope {
62 error: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 detail: Option<serde_json::Value>,
65}
66
67fn json_response(status: u16, body: &serde_json::Value) -> Response<std::io::Cursor<Vec<u8>>> {
68 let bytes = serde_json::to_vec(body).unwrap_or_else(|_| b"{}".to_vec());
69 Response::from_data(bytes)
70 .with_status_code(status)
71 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
72}
73
74fn error_response(status: u16, msg: impl Into<String>) -> Response<std::io::Cursor<Vec<u8>>> {
75 json_response(status, &serde_json::to_value(ErrorEnvelope {
76 error: msg.into(), detail: None,
77 }).unwrap())
78}
79
80fn error_with_detail(status: u16, msg: impl Into<String>, detail: serde_json::Value)
81 -> Response<std::io::Cursor<Vec<u8>>>
82{
83 json_response(status, &serde_json::to_value(ErrorEnvelope {
84 error: msg.into(), detail: Some(detail),
85 }).unwrap())
86}
87
88fn write_error_response(prefix: &str, err: lex_store::StoreError)
94 -> Response<std::io::Cursor<Vec<u8>>>
95{
96 if let lex_store::StoreError::Contention { branch, attempts } = &err {
97 let body = serde_json::to_vec(&ErrorEnvelope {
98 error: format!("{prefix}: branch '{branch}' is contended (attempts={attempts})"),
99 detail: Some(serde_json::json!({
100 "kind": "contention",
101 "branch": branch,
102 "attempts": attempts,
103 })),
104 }).unwrap_or_else(|_| b"{}".to_vec());
105 return Response::from_data(body)
106 .with_status_code(503)
107 .with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
108 .with_header(Header::from_bytes(&b"Retry-After"[..], &b"1"[..]).unwrap());
109 }
110 error_response(500, format!("{prefix}: {err}"))
111}
112
113pub fn handle(state: Arc<State>, mut req: Request) -> std::io::Result<()> {
114 let method = req.method().clone();
115 let url = req.url().to_string();
116 let path = url.split('?').next().unwrap_or("").to_string();
117 let query = url.split_once('?').map(|(_, q)| q.to_string()).unwrap_or_default();
118
119 let x_lex_user = req.headers().iter()
124 .find(|h| h.field.equiv("x-lex-user"))
125 .map(|h| h.value.as_str().to_string());
126
127 let mut body = String::new();
128 let _ = req.as_reader().read_to_string(&mut body);
129
130 let resp = route(&state, &method, &path, &query, &body, x_lex_user.as_deref());
131 req.respond(resp)
132}
133
134fn route(
135 state: &State,
136 method: &Method,
137 path: &str,
138 query: &str,
139 body: &str,
140 x_lex_user: Option<&str>,
141) -> Response<std::io::Cursor<Vec<u8>>> {
142 match (method, path) {
143 (Method::Get, "/") => crate::web::activity_handler(state),
145 (Method::Get, "/web/branches") => crate::web::branches_handler(state),
146 (Method::Get, "/web/trust") => crate::web::trust_handler(state),
147 (Method::Get, "/web/attention") => crate::web::attention_handler(state),
148 (Method::Get, p) if p.starts_with("/web/branch/") => {
149 let name = &p["/web/branch/".len()..];
150 crate::web::branch_handler(state, name)
151 }
152 (Method::Get, p) if p.starts_with("/web/stage/") => {
153 let id = &p["/web/stage/".len()..];
154 crate::web::stage_html_handler(state, id)
155 }
156 (Method::Post, p) if p.starts_with("/web/stage/") && (
161 p.ends_with("/pin") || p.ends_with("/defer")
162 || p.ends_with("/block") || p.ends_with("/unblock")
163 ) => {
164 let prefix_len = "/web/stage/".len();
165 let last_slash = p.rfind('/').unwrap_or(p.len());
166 let id = &p[prefix_len..last_slash];
167 let verb = &p[last_slash + 1..];
168 let decision = match verb {
169 "pin" => crate::web::WebStageDecision::Pin,
170 "defer" => crate::web::WebStageDecision::Defer,
171 "block" => crate::web::WebStageDecision::Block,
172 "unblock" => crate::web::WebStageDecision::Unblock,
173 _ => unreachable!("matched in outer guard"),
174 };
175 crate::web::stage_decision_handler(state, id, body, decision, x_lex_user)
176 }
177 (Method::Get, "/v1/health") => json_response(200, &serde_json::json!({"ok": true})),
179 (Method::Post, "/v1/parse") => parse_handler(body),
180 (Method::Post, "/v1/check") => check_handler(body),
181 (Method::Post, "/v1/publish") => publish_handler(state, body),
182 (Method::Post, "/v1/patch") => patch_handler(state, body),
183 (Method::Get, p) if p.starts_with("/v1/stage/") => {
184 let suffix = &p["/v1/stage/".len()..];
185 if let Some(id) = suffix.strip_suffix("/attestations") {
188 stage_attestations_handler(state, id)
189 } else {
190 stage_handler(state, suffix)
191 }
192 }
193 (Method::Post, "/v1/run") => run_handler(state, body, false),
194 (Method::Post, "/v1/replay") => run_handler(state, body, true),
195 (Method::Get, p) if p.starts_with("/v1/trace/") => {
196 let id = &p["/v1/trace/".len()..];
197 trace_handler(state, id)
198 }
199 (Method::Get, "/v1/diff") => diff_handler(state, query),
200 (Method::Post, "/v1/merge/start") => merge_start_handler(state, body),
201 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/resolve") => {
202 let id = &p["/v1/merge/".len()..p.len() - "/resolve".len()];
203 merge_resolve_handler(state, id, body)
204 }
205 (Method::Post, p) if p.starts_with("/v1/merge/") && p.ends_with("/commit") => {
206 let id = &p["/v1/merge/".len()..p.len() - "/commit".len()];
207 merge_commit_handler(state, id)
208 }
209 (Method::Post, "/v1/ops/batch") => ops_batch_handler(state, body),
211 (Method::Post, "/v1/attestations/batch") => attestations_batch_handler(state, body),
212 (Method::Get, p) if p.starts_with("/v1/branches/") && p.ends_with("/head") => {
216 let name = &p["/v1/branches/".len()..p.len() - "/head".len()];
217 branch_head_handler(state, name)
218 }
219 (Method::Get, "/v1/ops/since") => ops_since_handler(state, query),
223 (Method::Get, "/v1/attestations/since") => attestations_since_handler(state, query),
224 _ => error_response(404, format!("unknown route: {method:?} {path}")),
225 }
226}
227
228#[derive(Deserialize)]
229struct ParseReq { source: String }
230
231fn parse_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
232 let req: ParseReq = match serde_json::from_str(body) {
233 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
234 };
235 match load_program_from_str(&req.source) {
236 Ok(prog) => {
237 let stages = canonicalize_program(&prog);
238 json_response(200, &serde_json::to_value(&stages).unwrap())
239 }
240 Err(e) => error_response(400, format!("syntax error: {e}")),
241 }
242}
243
244pub(crate) fn check_handler(body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
245 let req: ParseReq = match serde_json::from_str(body) {
246 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
247 };
248 let prog = match load_program_from_str(&req.source) {
249 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
250 };
251 let stages = canonicalize_program(&prog);
252 match lex_types::check_program(&stages) {
253 Ok(_) => json_response(200, &serde_json::json!({"ok": true})),
254 Err(errs) => json_response(422, &serde_json::to_value(&errs).unwrap()),
255 }
256}
257
258#[derive(Deserialize)]
259struct PublishReq { source: String, #[serde(default)] activate: bool }
260
261pub(crate) fn publish_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
262 let req: PublishReq = match serde_json::from_str(body) {
263 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
264 };
265 let prog = match load_program_from_str(&req.source) {
266 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
267 };
268 let mut stages = canonicalize_program(&prog);
272 if let Err(errs) = lex_types::check_and_rewrite_program(&mut stages) {
273 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
274 }
275
276 let store = state.store.lock().unwrap();
277 let branch = store.current_branch();
278
279 let old_head = match store.branch_head(&branch) {
281 Ok(h) => h,
282 Err(e) => return error_response(500, format!("branch_head: {e}")),
283 };
284 let old_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = old_head.values()
285 .filter_map(|stg| store.get_ast(stg).ok())
286 .filter_map(|s| match s {
287 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd)),
288 _ => None,
289 })
290 .collect();
291 let new_fns: std::collections::BTreeMap<String, lex_ast::FnDecl> = stages.iter()
292 .filter_map(|s| match s {
293 lex_ast::Stage::FnDecl(fd) => Some((fd.name.clone(), fd.clone())),
294 _ => None,
295 })
296 .collect();
297 let report = lex_vcs::compute_diff(&old_fns, &new_fns, false);
298
299 let mut new_imports: lex_vcs::ImportMap = lex_vcs::ImportMap::new();
301 {
302 let entry = new_imports.entry("<source>".into()).or_default();
303 for s in &stages {
304 if let lex_ast::Stage::Import(im) = s {
305 entry.insert(im.reference.clone());
306 }
307 }
308 }
309
310 match store.publish_program(&branch, &stages, &report, &new_imports, req.activate) {
311 Ok(outcome) => json_response(200, &serde_json::json!({
312 "ops": outcome.ops,
313 "head_op": outcome.head_op,
314 })),
315 Err(lex_store::StoreError::TypeError(errs)) => {
323 error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap())
324 }
325 Err(e) => write_error_response("publish_program", e),
326 }
327}
328
329#[derive(Deserialize)]
330struct PatchReq {
331 stage_id: String,
332 patch: lex_ast::Patch,
333 #[serde(default)] activate: bool,
334}
335
336fn patch_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
339 let req: PatchReq = match serde_json::from_str(body) {
340 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
341 };
342 let store = state.store.lock().unwrap();
343
344 let original = match store.get_ast(&req.stage_id) {
346 Ok(s) => s, Err(e) => return error_response(404, format!("stage: {e}")),
347 };
348
349 let patched = match lex_ast::apply_patch(&original, &req.patch) {
351 Ok(s) => s,
352 Err(e) => return error_with_detail(422, "patch failed",
353 serde_json::to_value(&e).unwrap_or_default()),
354 };
355
356 let stages = vec![patched.clone()];
358 if let Err(errs) = lex_types::check_program(&stages) {
359 return error_with_detail(422, "type errors after patch",
360 serde_json::to_value(&errs).unwrap_or_default());
361 }
362
363 let branch = store.current_branch();
367
368 let sig = match lex_ast::sig_id(&patched) {
370 Some(s) => s,
371 None => return error_response(500, "patched stage has no sig_id"),
372 };
373
374 let new_id = match store.publish(&patched) {
375 Ok(id) => id, Err(e) => return error_response(500, format!("publish: {e}")),
376 };
377 if req.activate {
378 if let Err(e) = store.activate(&new_id) {
379 return error_response(500, format!("activate: {e}"));
380 }
381 }
382
383 let original_effects: std::collections::BTreeSet<String> = match &original {
385 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
386 _ => std::collections::BTreeSet::new(),
387 };
388 let patched_effects: std::collections::BTreeSet<String> = match &patched {
389 lex_ast::Stage::FnDecl(fd) => fd.effects.iter().map(|e| e.name.clone()).collect(),
390 _ => std::collections::BTreeSet::new(),
391 };
392 let head_now = match store.get_branch(&branch) {
393 Ok(b) => b.and_then(|b| b.head_op),
394 Err(e) => return error_response(500, format!("get_branch: {e}")),
395 };
396 let kind = if original_effects != patched_effects {
397 let from_budget = lex_vcs::operation_budget_from_effects(&original_effects);
404 let to_budget = lex_vcs::operation_budget_from_effects(&patched_effects);
405 lex_vcs::OperationKind::ChangeEffectSig {
406 sig_id: sig.clone(),
407 from_stage_id: req.stage_id.clone(),
408 to_stage_id: new_id.clone(),
409 from_effects: original_effects,
410 to_effects: patched_effects,
411 from_budget,
412 to_budget,
413 }
414 } else {
415 let budget = lex_vcs::operation_budget_from_effects(&original_effects);
416 lex_vcs::OperationKind::ModifyBody {
417 sig_id: sig.clone(),
418 from_stage_id: req.stage_id.clone(),
419 to_stage_id: new_id.clone(),
420 from_budget: budget,
421 to_budget: budget,
422 }
423 };
424 let transition = lex_vcs::StageTransition::Replace {
425 sig_id: sig.clone(),
426 from: req.stage_id.clone(),
427 to: new_id.clone(),
428 };
429 let op = lex_vcs::Operation::new(
430 kind,
431 head_now.into_iter().collect::<Vec<_>>(),
432 );
433 let op_id = match store.apply_operation(&branch, op, transition) {
434 Ok(id) => id,
435 Err(e) => return write_error_response("apply_operation", e),
436 };
437
438 let status = format!("{:?}",
439 store.get_status(&new_id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
440 json_response(200, &serde_json::json!({
441 "old_stage_id": req.stage_id,
442 "new_stage_id": new_id,
443 "sig_id": sig,
444 "status": status,
445 "op_id": op_id,
446 }))
447}
448
449pub(crate) fn stage_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
450 let store = state.store.lock().unwrap();
451 let meta = match store.get_metadata(id) {
452 Ok(m) => m, Err(e) => return error_response(404, format!("{e}")),
453 };
454 let ast = match store.get_ast(id) {
455 Ok(a) => a, Err(e) => return error_response(404, format!("{e}")),
456 };
457 let status = format!("{:?}", store.get_status(id).unwrap_or(lex_store::StageStatus::Draft)).to_lowercase();
458 json_response(200, &serde_json::json!({
459 "metadata": meta,
460 "ast": ast,
461 "status": status,
462 }))
463}
464
465pub(crate) fn stage_attestations_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
474 let store = state.store.lock().unwrap();
475 if let Err(e) = store.get_metadata(id) {
476 return error_response(404, format!("{e}"));
477 }
478 let log = match store.attestation_log() {
479 Ok(l) => l,
480 Err(e) => return error_response(500, format!("attestation log: {e}")),
481 };
482 let mut listing = match log.list_for_stage(&id.to_string()) {
483 Ok(v) => v,
484 Err(e) => return error_response(500, format!("list_for_stage: {e}")),
485 };
486 listing.sort_by_key(|a| std::cmp::Reverse(a.timestamp));
487 json_response(200, &serde_json::json!({"attestations": listing}))
488}
489
490#[derive(Deserialize, Default)]
491struct PolicyJson {
492 #[serde(default)] allow_effects: Vec<String>,
493 #[serde(default)] allow_fs_read: Vec<String>,
494 #[serde(default)] allow_fs_write: Vec<String>,
495 #[serde(default)] budget: Option<u64>,
496}
497
498impl PolicyJson {
499 fn into_policy(self) -> Policy {
500 Policy {
501 allow_effects: self.allow_effects.into_iter().collect::<BTreeSet<_>>(),
502 allow_fs_read: self.allow_fs_read.into_iter().map(PathBuf::from).collect(),
503 allow_fs_write: self.allow_fs_write.into_iter().map(PathBuf::from).collect(),
504 allow_net_host: Vec::new(),
505 allow_proc: Vec::new(),
506 budget: self.budget,
507 }
508 }
509}
510
511#[derive(Deserialize)]
512struct RunReq {
513 source: String,
514 #[serde(rename = "fn")] func: String,
515 #[serde(default)] args: Vec<serde_json::Value>,
516 #[serde(default)] policy: PolicyJson,
517 #[serde(default)] overrides: IndexMap<String, serde_json::Value>,
518}
519
520pub(crate) fn run_handler(state: &State, body: &str, with_overrides: bool) -> Response<std::io::Cursor<Vec<u8>>> {
521 let req: RunReq = match serde_json::from_str(body) {
522 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
523 };
524 let prog = match load_program_from_str(&req.source) {
525 Ok(p) => p, Err(e) => return error_response(400, format!("syntax error: {e}")),
526 };
527 let stages = canonicalize_program(&prog);
528 if let Err(errs) = lex_types::check_program(&stages) {
529 return error_with_detail(422, "type errors", serde_json::to_value(&errs).unwrap());
530 }
531 let bc = compile_program(&stages);
532 let policy = req.policy.into_policy();
533 if let Err(violations) = check_policy(&bc, &policy) {
534 return error_with_detail(403, "policy violation", serde_json::to_value(&violations).unwrap());
535 }
536
537 let mut recorder = lex_trace::Recorder::new();
538 if with_overrides && !req.overrides.is_empty() {
539 recorder = recorder.with_overrides(req.overrides);
540 }
541 let handle = recorder.handle();
542 let handler = DefaultHandler::new(policy);
543 let mut vm = Vm::with_handler(&bc, Box::new(handler));
544 vm.set_tracer(Box::new(recorder));
545
546 let vargs: Vec<Value> = req.args.iter().map(json_to_value).collect();
547 let started = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
548 let result = vm.call(&req.func, vargs);
549 let ended = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
550
551 let store = state.store.lock().unwrap();
552 let (root_out, root_err, status) = match &result {
553 Ok(v) => (Some(value_to_json(v)), None, 200u16),
554 Err(e) => (None, Some(format!("{e}")), 200u16),
555 };
556 let tree = handle.finalize(req.func.clone(), serde_json::Value::Null,
557 root_out.clone(), root_err.clone(), started, ended);
558 let run_id = match store.save_trace(&tree) {
559 Ok(id) => id,
560 Err(e) => return error_response(500, format!("save_trace: {e}")),
561 };
562
563 let mut body = serde_json::json!({
564 "run_id": run_id,
565 "output": root_out,
566 });
567 if let Some(err) = root_err {
568 body["error"] = serde_json::Value::String(err);
569 }
570 json_response(status, &body)
571}
572
573fn trace_handler(state: &State, id: &str) -> Response<std::io::Cursor<Vec<u8>>> {
574 let store = state.store.lock().unwrap();
575 match store.load_trace(id) {
576 Ok(t) => json_response(200, &serde_json::to_value(&t).unwrap()),
577 Err(e) => error_response(404, format!("{e}")),
578 }
579}
580
581fn diff_handler(state: &State, query: &str) -> Response<std::io::Cursor<Vec<u8>>> {
582 let mut a = None;
583 let mut b = None;
584 for kv in query.split('&') {
585 if let Some((k, v)) = kv.split_once('=') {
586 match k { "a" => a = Some(v.to_string()), "b" => b = Some(v.to_string()), _ => {} }
587 }
588 }
589 let (Some(a), Some(b)) = (a, b) else {
590 return error_response(400, "missing a or b query params");
591 };
592 let store = state.store.lock().unwrap();
593 let ta = match store.load_trace(&a) { Ok(t) => t, Err(e) => return error_response(404, format!("a: {e}")) };
594 let tb = match store.load_trace(&b) { Ok(t) => t, Err(e) => return error_response(404, format!("b: {e}")) };
595 match lex_trace::diff_runs(&ta, &tb) {
596 Some(d) => json_response(200, &serde_json::to_value(&d).unwrap()),
597 None => json_response(200, &serde_json::json!({"divergence": null})),
598 }
599}
600
601fn json_to_value(v: &serde_json::Value) -> Value { Value::from_json(v) }
602
603fn value_to_json(v: &Value) -> serde_json::Value { v.to_json() }
604
605#[derive(Deserialize)]
606struct MergeStartReq {
607 src_branch: String,
608 dst_branch: String,
609}
610
611fn merge_start_handler(state: &State, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
622 let req: MergeStartReq = match serde_json::from_str(body) {
623 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
624 };
625 let store = state.store.lock().unwrap();
626 let src_head = match store.get_branch(&req.src_branch) {
627 Ok(Some(b)) => b.head_op,
628 Ok(None) => return error_response(404, format!("unknown src branch `{}`", req.src_branch)),
629 Err(e) => return error_response(500, format!("src branch read: {e}")),
630 };
631 let dst_head = match store.get_branch(&req.dst_branch) {
632 Ok(Some(b)) => b.head_op,
633 Ok(None) => return error_response(404, format!("unknown dst branch `{}`", req.dst_branch)),
634 Err(e) => return error_response(500, format!("dst branch read: {e}")),
635 };
636 let log = match lex_vcs::OpLog::open(store.root()) {
637 Ok(l) => l,
638 Err(e) => return error_response(500, format!("op log: {e}")),
639 };
640 let merge_id = mint_merge_id();
644 let session = match MergeSession::start(
645 merge_id.clone(),
646 &log,
647 src_head.as_ref(),
648 dst_head.as_ref(),
649 ) {
650 Ok(s) => s,
651 Err(e) => return error_response(500, format!("merge start: {e}")),
652 };
653 let conflicts: Vec<&lex_vcs::ConflictRecord> = session.remaining_conflicts();
654 let auto_resolved_count = session.auto_resolved.len();
655 let body = serde_json::json!({
656 "merge_id": merge_id,
657 "src_head": session.src_head,
658 "dst_head": session.dst_head,
659 "lca": session.lca,
660 "conflicts": conflicts,
661 "auto_resolved_count": auto_resolved_count,
662 });
663 drop(conflicts);
664 drop(store);
665 let wrapped = ApiMergeSession {
666 inner: session,
667 src_branch: req.src_branch,
668 dst_branch: req.dst_branch,
669 };
670 state.sessions.lock().unwrap().insert(merge_id, wrapped);
671 json_response(200, &body)
672}
673
674#[derive(Deserialize)]
675struct MergeResolveReq {
676 resolutions: Vec<MergeResolveEntry>,
681}
682
683#[derive(Deserialize)]
684struct MergeResolveEntry {
685 conflict_id: String,
686 resolution: lex_vcs::Resolution,
687}
688
689fn merge_resolve_handler(
700 state: &State,
701 merge_id: &str,
702 body: &str,
703) -> Response<std::io::Cursor<Vec<u8>>> {
704 let req: MergeResolveReq = match serde_json::from_str(body) {
705 Ok(r) => r, Err(e) => return error_response(400, format!("bad request: {e}")),
706 };
707 let mut sessions = state.sessions.lock().unwrap();
708 let Some(wrapped) = sessions.get_mut(merge_id) else {
709 return error_response(404, format!("unknown merge_id `{merge_id}`"));
710 };
711 let pairs: Vec<(String, lex_vcs::Resolution)> = req.resolutions.into_iter()
712 .map(|e| (e.conflict_id, e.resolution))
713 .collect();
714 let verdicts = wrapped.inner.resolve(pairs);
715 let remaining: Vec<&lex_vcs::ConflictRecord> = wrapped.inner.remaining_conflicts();
716 let body = serde_json::json!({
717 "verdicts": verdicts,
718 "remaining_conflicts": remaining,
719 });
720 json_response(200, &body)
721}
722
723fn merge_commit_handler(
742 state: &State,
743 merge_id: &str,
744) -> Response<std::io::Cursor<Vec<u8>>> {
745 use std::collections::BTreeMap;
746 let wrapped = match state.sessions.lock().unwrap().remove(merge_id) {
747 Some(w) => w,
748 None => return error_response(404, format!("unknown merge_id `{merge_id}`")),
749 };
750 let dst_branch = wrapped.dst_branch.clone();
751 let src_head = wrapped.inner.src_head.clone();
752 let dst_head = wrapped.inner.dst_head.clone();
753 let auto_resolved = wrapped.inner.auto_resolved.clone();
754
755 let mut entries: BTreeMap<lex_vcs::SigId, Option<lex_vcs::StageId>> = BTreeMap::new();
758
759 for outcome in &auto_resolved {
761 if let lex_vcs::MergeOutcome::Src { sig_id, stage_id } = outcome {
762 entries.insert(sig_id.clone(), stage_id.clone());
763 }
764 }
765
766 let resolved = match wrapped.inner.commit() {
768 Ok(r) => r,
769 Err(lex_vcs::CommitError::ConflictsRemaining(ids)) => {
770 return error_with_detail(
774 422,
775 "conflicts remaining",
776 serde_json::json!({"unresolved": ids}),
777 );
778 }
779 };
780
781 for (conflict_id, resolution) in resolved {
782 match resolution {
783 lex_vcs::Resolution::TakeOurs => {
784 }
786 lex_vcs::Resolution::TakeTheirs => {
787 match resolve_take_theirs(state, &src_head, &conflict_id) {
797 Ok(stage_id) => {
798 entries.insert(conflict_id.clone(), stage_id);
799 }
800 Err(e) => return error_response(500, format!("resolve take_theirs: {e}")),
801 }
802 }
803 lex_vcs::Resolution::Custom { op } => {
804 match op.kind.merge_target() {
813 Some((sig, stage)) => {
814 if sig != conflict_id {
815 return error_with_detail(
816 422,
817 "custom op targets a different sig than the conflict",
818 serde_json::json!({
819 "conflict_id": conflict_id,
820 "op_targets": sig,
821 }),
822 );
823 }
824 entries.insert(conflict_id, stage);
825 }
826 None => {
827 return error_with_detail(
828 422,
829 "custom op kind doesn't yield a single sig→stage delta",
830 serde_json::json!({
831 "conflict_id": conflict_id,
832 "kind": serde_json::to_value(&op.kind).unwrap_or(serde_json::Value::Null),
833 }),
834 );
835 }
836 }
837 }
838 lex_vcs::Resolution::Defer => {
839 return error_response(500, "internal: Defer slipped past commit gate");
841 }
842 }
843 }
844
845 let resolved_count = entries.len();
846 let mut parents: Vec<lex_vcs::OpId> = Vec::new();
847 if let Some(d) = dst_head { parents.push(d); }
848 if let Some(s) = src_head { parents.push(s); }
849 let op = lex_vcs::Operation::new(
850 lex_vcs::OperationKind::Merge { resolved: resolved_count },
851 parents,
852 );
853 let transition = lex_vcs::StageTransition::Merge { entries };
854 let store = state.store.lock().unwrap();
855 match store.apply_operation(&dst_branch, op, transition) {
856 Ok(new_head_op) => json_response(200, &serde_json::json!({
857 "new_head_op": new_head_op,
858 "dst_branch": dst_branch,
859 })),
860 Err(e) => write_error_response("apply merge op", e),
861 }
862}
863
864fn resolve_take_theirs(
869 state: &State,
870 src_head: &Option<lex_vcs::OpId>,
871 sig: &lex_vcs::SigId,
872) -> std::io::Result<Option<lex_vcs::StageId>> {
873 let store = state.store.lock().unwrap();
874 let log = lex_vcs::OpLog::open(store.root())?;
875 let Some(head) = src_head.as_ref() else { return Ok(None); };
876 let mut current: Option<lex_vcs::StageId> = None;
879 for record in log.walk_forward(head, None)? {
880 match &record.produces {
881 lex_vcs::StageTransition::Create { sig_id, stage_id }
882 if sig_id == sig => { current = Some(stage_id.clone()); }
883 lex_vcs::StageTransition::Replace { sig_id, to, .. }
884 if sig_id == sig => { current = Some(to.clone()); }
885 lex_vcs::StageTransition::Remove { sig_id, .. }
886 if sig_id == sig => { current = None; }
887 lex_vcs::StageTransition::Rename { from, to, body_stage_id }
888 if from == sig || to == sig => {
889 if from == sig { current = None; }
890 if to == sig { current = Some(body_stage_id.clone()); }
891 }
892 lex_vcs::StageTransition::Merge { entries } => {
893 if let Some(opt) = entries.get(sig) {
894 current = opt.clone();
895 }
896 }
897 _ => {}
898 }
899 }
900 Ok(current)
901}
902
903fn mint_merge_id() -> MergeSessionId {
904 use std::sync::atomic::{AtomicU64, Ordering};
905 static COUNTER: AtomicU64 = AtomicU64::new(0);
906 let nanos = SystemTime::now()
907 .duration_since(UNIX_EPOCH)
908 .map(|d| d.as_nanos())
909 .unwrap_or(0);
910 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
911 format!("merge_{nanos:x}_{n:x}")
912}
913
914pub(crate) fn ops_batch_handler(state: &State, body: &str)
945 -> Response<std::io::Cursor<Vec<u8>>>
946{
947 let records: Vec<lex_vcs::OperationRecord> = match serde_json::from_str(body) {
948 Ok(r) => r,
949 Err(e) => return error_response(400,
950 format!("body must be a JSON array of OperationRecord: {e}")),
951 };
952 let store = state.store.lock().unwrap();
953 let log = match lex_vcs::OpLog::open(store.root()) {
954 Ok(l) => l,
955 Err(e) => return error_response(500, format!("opening op log: {e}")),
956 };
957
958 let mut batch_ids: std::collections::BTreeSet<lex_vcs::OpId> =
966 std::collections::BTreeSet::new();
967 for rec in &records {
968 let expected = rec.op.op_id();
969 if expected != rec.op_id {
970 return error_with_detail(409, "OpIdMismatch", serde_json::json!({
971 "supplied": rec.op_id,
972 "expected": expected,
973 }));
974 }
975 for parent in &rec.op.parents {
976 let known = match log.get(parent) {
977 Ok(Some(_)) => true,
978 Ok(None) => false,
979 Err(e) => return error_response(500, format!("op log read: {e}")),
980 };
981 if !known && !batch_ids.contains(parent) {
982 return error_with_detail(422, "MissingParent", serde_json::json!({
983 "op_id": rec.op_id,
984 "missing_parent": parent,
985 }));
986 }
987 }
988 batch_ids.insert(rec.op_id.clone());
989 }
990
991 let mut added = 0usize;
994 let mut added_ids: Vec<&lex_vcs::OpId> = Vec::new();
995 for rec in &records {
996 let already_present = matches!(log.get(&rec.op_id), Ok(Some(_)));
997 match log.put(rec) {
998 Ok(()) => {
999 if !already_present {
1000 added += 1;
1001 added_ids.push(&rec.op_id);
1002 }
1003 }
1004 Err(e) => return error_response(500, format!("op log write: {e}")),
1005 }
1006 }
1007
1008 json_response(200, &serde_json::json!({
1009 "received": records.len(),
1010 "added": added,
1011 "skipped": records.len() - added,
1012 "added_ids": added_ids,
1013 }))
1014}
1015
1016pub(crate) fn attestations_batch_handler(state: &State, body: &str)
1038 -> Response<std::io::Cursor<Vec<u8>>>
1039{
1040 let attestations: Vec<lex_vcs::Attestation> = match serde_json::from_str(body) {
1041 Ok(a) => a,
1042 Err(e) => return error_response(400,
1043 format!("body must be a JSON array of Attestation: {e}")),
1044 };
1045 let store = state.store.lock().unwrap();
1046 let log = match store.attestation_log() {
1047 Ok(l) => l,
1048 Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1049 };
1050 let op_log = match lex_vcs::OpLog::open(store.root()) {
1051 Ok(l) => l,
1052 Err(e) => return error_response(500, format!("opening op log: {e}")),
1053 };
1054
1055 for att in &attestations {
1057 let expected = lex_vcs::Attestation::with_timestamp(
1060 att.stage_id.clone(),
1061 att.op_id.clone(),
1062 att.intent_id.clone(),
1063 att.kind.clone(),
1064 att.result.clone(),
1065 att.produced_by.clone(),
1066 att.cost.clone(),
1067 att.timestamp,
1068 ).attestation_id;
1069 if expected != att.attestation_id {
1070 return error_with_detail(409, "AttestationIdMismatch", serde_json::json!({
1071 "supplied": att.attestation_id,
1072 "expected": expected,
1073 }));
1074 }
1075 if let Some(op_id) = &att.op_id {
1079 match op_log.get(op_id) {
1080 Ok(Some(_)) => {}
1081 Ok(None) => return error_with_detail(422, "UnknownOp", serde_json::json!({
1082 "attestation_id": att.attestation_id,
1083 "op_id": op_id,
1084 })),
1085 Err(e) => return error_response(500, format!("op log read: {e}")),
1086 }
1087 }
1088 }
1089
1090 let mut added = 0usize;
1094 let mut added_ids: Vec<&lex_vcs::AttestationId> = Vec::new();
1095 for att in &attestations {
1096 let already_present = matches!(log.get(&att.attestation_id), Ok(Some(_)));
1097 match log.put(att) {
1098 Ok(()) => {
1099 if !already_present {
1100 added += 1;
1101 added_ids.push(&att.attestation_id);
1102 }
1103 }
1104 Err(e) => return error_response(500, format!("attestation log write: {e}")),
1105 }
1106 }
1107
1108 json_response(200, &serde_json::json!({
1109 "received": attestations.len(),
1110 "added": added,
1111 "skipped": attestations.len() - added,
1112 "added_ids": added_ids,
1113 }))
1114}
1115
1116pub(crate) fn branch_head_handler(state: &State, name: &str)
1125 -> Response<std::io::Cursor<Vec<u8>>>
1126{
1127 let store = state.store.lock().unwrap();
1128 let head = match store.get_branch(name) {
1129 Ok(Some(b)) => b.head_op,
1130 Ok(None) => None,
1131 Err(e) => return error_response(500, format!("get_branch: {e}")),
1132 };
1133 json_response(200, &serde_json::json!({
1134 "branch": name,
1135 "head_op": head,
1136 }))
1137}
1138
1139pub(crate) fn ops_since_handler(state: &State, query: &str)
1163 -> Response<std::io::Cursor<Vec<u8>>>
1164{
1165 let mut after: Option<String> = None;
1166 let mut branch = String::from("main");
1167 let mut limit: Option<usize> = None;
1168 for kv in query.split('&') {
1169 let Some((k, v)) = kv.split_once('=') else { continue };
1170 match k {
1171 "after" => after = Some(v.to_string()),
1172 "branch" => branch = v.to_string(),
1173 "limit" => {
1174 limit = Some(match v.parse::<usize>() {
1175 Ok(n) => n,
1176 Err(_) => return error_response(400,
1177 format!("limit must be a positive integer, got `{v}`")),
1178 });
1179 }
1180 _ => {}
1181 }
1182 }
1183
1184 let store = state.store.lock().unwrap();
1185 let log = match lex_vcs::OpLog::open(store.root()) {
1186 Ok(l) => l,
1187 Err(e) => return error_response(500, format!("opening op log: {e}")),
1188 };
1189 let head = match store.get_branch(&branch) {
1190 Ok(Some(b)) => b.head_op,
1191 Ok(None) => None,
1192 Err(e) => return error_response(500, format!("get_branch: {e}")),
1193 };
1194 let Some(head) = head else {
1195 return json_response(200, &serde_json::json!([]));
1196 };
1197
1198 let ops_since = match log.ops_since(&head, after.as_ref()) {
1199 Ok(o) => o,
1200 Err(e) => return error_response(500, format!("ops_since: {e}")),
1201 };
1202 let mut ops = ops_since;
1206 ops.reverse();
1207 if let Some(n) = limit {
1208 ops.truncate(n);
1209 }
1210
1211 json_response(200, &serde_json::to_value(&ops).unwrap_or_default())
1212}
1213
1214pub(crate) fn attestations_since_handler(state: &State, query: &str)
1227 -> Response<std::io::Cursor<Vec<u8>>>
1228{
1229 let mut after_op: Option<String> = None;
1230 let mut limit: Option<usize> = None;
1231 for kv in query.split('&') {
1232 let Some((k, v)) = kv.split_once('=') else { continue };
1233 match k {
1234 "after-op" => after_op = Some(v.to_string()),
1235 "limit" => {
1236 limit = Some(match v.parse::<usize>() {
1237 Ok(n) => n,
1238 Err(_) => return error_response(400,
1239 format!("limit must be a positive integer, got `{v}`")),
1240 });
1241 }
1242 _ => {}
1243 }
1244 }
1245
1246 let store = state.store.lock().unwrap();
1247 let log = match store.attestation_log() {
1248 Ok(l) => l,
1249 Err(e) => return error_response(500, format!("opening attestation log: {e}")),
1250 };
1251
1252 let exclude: std::collections::BTreeSet<String> = match &after_op {
1256 None => std::collections::BTreeSet::new(),
1257 Some(cutoff) => {
1258 let op_log = match lex_vcs::OpLog::open(store.root()) {
1259 Ok(l) => l,
1260 Err(e) => return error_response(500, format!("opening op log: {e}")),
1261 };
1262 match op_log.walk_back(cutoff, None) {
1263 Ok(records) => records.into_iter().map(|r| r.op_id).collect(),
1264 Err(_) => {
1265 std::collections::BTreeSet::new()
1269 }
1270 }
1271 }
1272 };
1273
1274 let all = match log.list_all() {
1275 Ok(v) => v,
1276 Err(e) => return error_response(500, format!("listing attestations: {e}")),
1277 };
1278 let mut filtered: Vec<lex_vcs::Attestation> = all
1279 .into_iter()
1280 .filter(|a| match &a.op_id {
1281 Some(op_id) => !exclude.contains(op_id),
1282 None => true,
1286 })
1287 .collect();
1288 filtered.sort_by(|a, b| {
1292 a.timestamp.cmp(&b.timestamp)
1293 .then_with(|| a.attestation_id.cmp(&b.attestation_id))
1294 });
1295 if let Some(n) = limit {
1296 filtered.truncate(n);
1297 }
1298
1299 json_response(200, &serde_json::to_value(&filtered).unwrap_or_default())
1300}
1301