pub mod http {
use axum::{
extract::{Path, Query, State},
http::{HeaderMap, HeaderValue, StatusCode},
response::{Html, IntoResponse, Json},
routing::get,
Router,
};
use lemma::collect_lemma_sources as engine_collect_sources;
use lemma::DateTimeValue;
use lemma::Engine;
use lemma_cli::deps::{dependency_identifier_from_dependency_path, lemma_deps_dir};
use serde::Deserialize;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tower_http::cors::CorsLayer;
use tracing::{error, info, warn};
type SharedEngine = Arc<RwLock<Engine>>;
fn parse_spec_path(path: &str) -> String {
path.trim_matches('/').to_string()
}
fn accept_datetime_from_headers(
headers: &HeaderMap,
) -> Result<DateTimeValue, (StatusCode, Json<ErrorResponse>)> {
let raw = headers
.get("Accept-Datetime")
.and_then(|v| v.to_str().ok())
.map(|s| s.trim());
resolve_effective(raw)
}
#[derive(Deserialize, Default)]
struct EffectiveQuery {
effective: Option<String>,
}
#[derive(Deserialize, Default)]
struct SpecQuery {
rules: Option<String>,
}
fn resolve_effective(
raw: Option<&str>,
) -> Result<DateTimeValue, (StatusCode, Json<ErrorResponse>)> {
match raw {
Some(s) => s.parse::<DateTimeValue>().ok().ok_or_else(|| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Invalid effective value '{}'. Expected: YYYY, YYYY-MM, YYYY-MM-DD, or ISO 8601 datetime", s),
}),
)
}),
None => Ok(DateTimeValue::now()),
}
}
#[derive(Clone)]
struct AppState {
engine: SharedEngine,
explanations_enabled: bool,
}
#[derive(Debug, serde::Serialize)]
struct ErrorResponse {
error: String,
}
#[derive(serde::Serialize)]
struct GetSpecResponse {
spec_set_id: String,
effective_from: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
rules: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
meta: Option<serde_json::Value>,
versions: Vec<VersionEntry>,
}
#[derive(serde::Serialize)]
struct VersionEntry {
effective_from: Option<String>,
effective_to: Option<String>,
}
fn spec_response_headers(
effective_from: Option<&DateTimeValue>,
) -> Vec<(axum::http::header::HeaderName, HeaderValue)> {
let mut h = Vec::new();
if let Some(af) = effective_from {
if let Ok(v) = HeaderValue::from_str(&af.to_string()) {
h.push((
axum::http::header::HeaderName::from_static("memento-datetime"),
v,
));
}
}
h.push((
axum::http::header::VARY,
HeaderValue::from_static("Accept-Datetime"),
));
h
}
pub async fn start_server(
engine: Engine,
host: &str,
port: u16,
watch: bool,
explanations: bool,
workdir: PathBuf,
) -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "lemma=info,tower_http=info".into()),
)
.init();
let shared_engine: SharedEngine = Arc::new(RwLock::new(engine));
if watch {
start_file_watcher(shared_engine.clone(), workdir)?;
}
let state = AppState {
engine: shared_engine,
explanations_enabled: explanations,
};
let app = Router::new()
.route("/", get(list_specs))
.route("/health", get(health_check))
.route("/openapi.json", get(openapi_spec))
.route("/docs", get(scalar_docs))
.route("/scalar.js", get(scalar_js))
.route("/{*path}", get(spec_get_schema).post(spec_post_evaluate))
.fallback(fallback_404)
.layer(CorsLayer::permissive())
.with_state(state);
let addr: SocketAddr = format!("{host}:{port}").parse()?;
info!("Lemma server listening on http://{}", addr);
info!("Interactive docs at http://{}/docs", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn list_specs(
State(state): State<AppState>,
Query(q): Query<EffectiveQuery>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let now = resolve_effective(q.effective.as_deref())?;
let engine = state.engine.read().await;
let specs: Vec<lemma::SpecSchema> = engine
.get_workspace()
.specs
.iter()
.filter_map(|ss| engine.schema(None, &ss.name, Some(&now)).ok())
.collect();
Ok(Json(specs))
}
async fn health_check() -> impl IntoResponse {
Json(serde_json::json!({
"status": "ok",
"service": "lemma",
"version": env!("CARGO_PKG_VERSION")
}))
}
async fn fallback_404() -> (StatusCode, Json<ErrorResponse>) {
(
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: "Not found. Use GET / for spec list, GET /docs for API docs.".to_string(),
}),
)
}
async fn openapi_spec(
State(state): State<AppState>,
Query(q): Query<EffectiveQuery>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let effective = resolve_effective(q.effective.as_deref())?;
let engine = state.engine.read().await;
let spec = lemma_openapi::generate_openapi_effective(
&engine,
state.explanations_enabled,
&effective,
);
Ok(Json(spec))
}
async fn scalar_docs(State(state): State<AppState>) -> impl IntoResponse {
let engine = state.engine.read().await;
let sources = lemma_openapi::temporal_api_sources(&engine);
let shared_opts = r#"layout: 'modern',
theme: 'solarized',
agent: { disabled: true },
hideClientButton: true,
hideTestRequestButton: false,
showSidebar: true,
showDeveloperTools: 'never',
operationTitleSource: 'summary',
persistAuth: false,
telemetry: false,
hideModels: true,
documentDownloadType: 'both', // Scalar UI option, not Lemma
hideSearch: false,
showOperationId: false,
hideDarkModeToggle: false,
withDefaultFonts: false,
defaultOpenAllTags: false,
expandAllModelSections: true,
expandAllResponses: true,
orderSchemaPropertiesBy: 'alpha',
orderRequiredPropertiesFirst: true,
customCss: `
a[href="https://www.scalar.com"] {
font-size: 0 !important;
}
a[href="https://www.scalar.com"]::after {
content: 'Powered by Lemma';
font-size: var(--scalar-mini, 10px);
}
`"#;
let config_js = if sources.len() == 1 {
format!("{{ url: '{}', {} }}", sources[0].url, shared_opts)
} else {
let sources_js: Vec<String> = sources
.iter()
.map(|s| {
format!(
"{{ title: '{}', slug: '{}', url: '{}' }}",
s.title, s.slug, s.url
)
})
.collect();
format!(
"{{ sources: [{}], {} }}",
sources_js.join(", "),
shared_opts
)
};
let html = format!(
r#"<!doctype html>
<html>
<head>
<title>Lemma API</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
</head>
<body>
<div id="app"></div>
<script src="/scalar.js"></script>
<script>
Scalar.createApiReference('#app', {config_js})
</script>
</body>
</html>"#
);
Html(html)
}
async fn scalar_js() -> impl IntoResponse {
static SCALAR_JS: &str = include_str!("../vendor/scalar-api-reference.js");
(
[(
axum::http::header::CONTENT_TYPE,
"application/javascript; charset=utf-8",
)],
SCALAR_JS,
)
}
async fn spec_get_schema(
State(state): State<AppState>,
Path(path): Path<String>,
Query(q): Query<SpecQuery>,
headers: HeaderMap,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let spec_set_id = parse_spec_path(&path);
let effective = accept_datetime_from_headers(&headers)?;
let engine = state.engine.read().await;
let spec_name = lemma::parse_spec_set_id(&spec_set_id).map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let schema = engine
.schema(None, &spec_name, Some(&effective))
.map_err(|e| {
(
lemma_error_to_status(&e),
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let rule_names = q.rules.as_deref().map(parse_rule_names).unwrap_or_default();
let schema = if rule_names.is_empty() {
schema
} else {
let plan = engine
.get_plan(None, &spec_name, Some(&effective))
.map_err(|e| {
(
lemma_error_to_status(&e),
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
plan.schema_for_rules(&rule_names).map_err(|err| {
(
lemma_error_to_status(&err),
Json(ErrorResponse {
error: err.to_string(),
}),
)
})?
};
let spec_arc = engine.get_spec(&spec_name, Some(&effective)).map_err(|e| {
(
lemma_error_to_status(&e),
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let versions: Vec<VersionEntry> = engine
.get_workspace()
.specs
.iter()
.filter(|ss| ss.name == spec_arc.name)
.flat_map(|ss| ss.iter_with_ranges())
.map(|(_, effective_from, effective_to)| VersionEntry {
effective_from: effective_from.as_ref().map(|d| d.to_string()),
effective_to: effective_to.as_ref().map(|d| d.to_string()),
})
.collect();
let effective_from_str = spec_arc.effective_from().map(|d| d.to_string());
let body = GetSpecResponse {
spec_set_id,
effective_from: effective_from_str,
data: Some(
serde_json::to_value(&schema.data).expect("BUG: failed to serialize schema data"),
),
rules: Some(
serde_json::to_value(&schema.rules).expect("BUG: failed to serialize schema rules"),
),
meta: Some(
serde_json::to_value(&schema.meta).expect("BUG: failed to serialize schema meta"),
),
versions,
};
let mut response = Json(body).into_response();
let headers_mut = response.headers_mut();
for (k, v) in spec_response_headers(spec_arc.effective_from()) {
headers_mut.insert(k, v);
}
Ok(response)
}
async fn spec_post_evaluate(
State(state): State<AppState>,
Path(path): Path<String>,
Query(q): Query<SpecQuery>,
headers: HeaderMap,
body: Option<Json<std::collections::HashMap<String, serde_json::Value>>>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let spec_set_id = parse_spec_path(&path);
let effective = accept_datetime_from_headers(&headers)?;
let rule_names = q.rules.as_deref().map(parse_rule_names).unwrap_or_default();
let engine = state.engine.read().await;
let spec_name = lemma::parse_spec_set_id(&spec_set_id).map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: e.to_string(),
}),
)
})?;
let data_values: std::collections::HashMap<String, lemma::DataValueInput> = body
.map(|Json(map)| {
map.into_iter()
.filter(|(_, v)| !v.is_null())
.map(|(k, v)| {
crate::data_json::json_value_to_data_input(v).map(|input| (k, input))
})
.collect::<Result<_, _>>()
})
.transpose()
.map_err(|e| (StatusCode::BAD_REQUEST, Json(ErrorResponse { error: e })))?
.unwrap_or_default();
let plan = engine
.get_plan(None, &spec_name, Some(&effective))
.map_err(|err| {
(
lemma_error_to_status(&err),
Json(ErrorResponse {
error: err.to_string(),
}),
)
})?;
let include_explanations = want_explanations(&state, &headers);
let mut response = engine
.run_plan(
plan,
Some(&effective),
data_values,
include_explanations,
true,
)
.map_err(|err| {
(
lemma_error_to_status(&err),
Json(ErrorResponse {
error: err.to_string(),
}),
)
})?;
if !rule_names.is_empty() {
response.filter_rules(&rule_names);
}
let spec_arc = engine.get_spec(&spec_name, Some(&effective)).ok();
let effective_from = spec_arc.as_ref().and_then(|a| a.effective_from());
let mut payload = response.clone();
if !include_explanations {
payload.data.clear();
}
let mut axum_response = Json(payload).into_response();
let headers_mut = axum_response.headers_mut();
for (k, v) in spec_response_headers(effective_from) {
headers_mut.insert(k, v);
}
Ok(axum_response)
}
fn want_explanations(state: &AppState, headers: &HeaderMap) -> bool {
state.explanations_enabled
&& headers
.get("x-explanations")
.and_then(|v: &axum::http::HeaderValue| v.to_str().ok())
.map(|s: &str| !s.trim().is_empty())
.unwrap_or(false)
}
fn lemma_error_to_status(err: &lemma::Error) -> StatusCode {
use lemma::RequestErrorKind;
match err {
lemma::Error::Request {
kind: RequestErrorKind::SpecNotFound,
..
} => StatusCode::NOT_FOUND,
_ => StatusCode::BAD_REQUEST,
}
}
fn parse_rule_names(rules_segment: &str) -> Vec<String> {
rules_segment
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty() && s != "{rules}")
.collect()
}
type ModifiedSnapshot = std::collections::BTreeMap<PathBuf, std::time::SystemTime>;
fn collect_modified_times(workdir: &std::path::Path) -> ModifiedSnapshot {
use walkdir::WalkDir;
let mut snapshot = std::collections::BTreeMap::new();
for entry in WalkDir::new(workdir).into_iter().flatten() {
if entry.path().extension().and_then(|s| s.to_str()) == Some("lemma") {
if let Ok(metadata) = entry.path().metadata() {
if let Ok(modified) = metadata.modified() {
snapshot.insert(entry.path().to_path_buf(), modified);
}
}
}
}
snapshot
}
fn start_file_watcher(shared_engine: SharedEngine, workdir: PathBuf) -> anyhow::Result<()> {
use notify_debouncer_mini::new_debouncer;
use std::sync::Mutex;
use std::time::Duration;
let watch_dir = workdir.clone();
let last_snapshot: Arc<Mutex<ModifiedSnapshot>> =
Arc::new(Mutex::new(collect_modified_times(&workdir)));
let mut debouncer = new_debouncer(
Duration::from_millis(500),
move |result: Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>| {
match result {
Ok(events) => {
let has_lemma_events = events.iter().any(|event| {
event
.path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "lemma")
.unwrap_or(false)
});
if !has_lemma_events {
return;
}
let current_snapshot = collect_modified_times(&workdir);
let files_changed = {
let previous = match last_snapshot.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
current_snapshot != *previous
};
if !files_changed {
return;
}
{
let mut previous = match last_snapshot.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
*previous = current_snapshot;
}
info!("Detected .lemma file changes, reloading...");
let engine_clone = shared_engine.clone();
let workdir_clone = workdir.clone();
std::thread::spawn(move || {
let runtime = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(err) => {
error!("Failed to create tokio runtime for reload: {}", err);
return;
}
};
runtime.block_on(async {
match reload_engine(&workdir_clone).await {
Ok(new_engine) => {
let spec_count = new_engine.get_workspace().specs.len();
let mut engine = engine_clone.write().await;
*engine = new_engine;
info!("Reloaded engine with {} spec(s)", spec_count);
}
Err(err) => {
warn!("Reload failed (keeping previous state): {}", err);
}
}
});
});
}
Err(err) => {
error!("File watcher error: {}", err);
}
}
},
)?;
debouncer
.watcher()
.watch(&watch_dir, notify::RecursiveMode::Recursive)?;
info!("Watching {:?} for .lemma file changes", watch_dir);
std::mem::forget(debouncer);
Ok(())
}
async fn reload_engine(workdir: &std::path::Path) -> anyhow::Result<Engine> {
use walkdir::WalkDir;
let mut engine = Engine::new();
let deps_dir = lemma_deps_dir(workdir);
let mut workspace_paths: Vec<std::path::PathBuf> = Vec::new();
let mut deps_paths: Vec<std::path::PathBuf> = Vec::new();
for entry in WalkDir::new(workdir) {
let entry = entry?;
if entry.path().extension().and_then(|s| s.to_str()) != Some("lemma") {
continue;
}
if entry.path().starts_with(&deps_dir) {
deps_paths.push(entry.path().to_path_buf());
} else {
workspace_paths.push(entry.path().to_path_buf());
}
}
for dep_path in &deps_paths {
let dependency_id = dependency_identifier_from_dependency_path(workdir, dep_path);
let sources = match engine_collect_sources(std::slice::from_ref(dep_path)) {
Ok(s) => s,
Err(e) => {
for err in e.iter() {
tracing::error!(
"{}",
crate::error_formatter::format_error(err, &e.sources)
);
}
anyhow::bail!("Workspace load failed ({} error(s))", e.errors.len());
}
};
if let Err(load_err) = engine.load_batch(sources, Some(&dependency_id)) {
for err in load_err.iter() {
tracing::error!(
"{}",
crate::error_formatter::format_error(err, &load_err.sources)
);
}
anyhow::bail!("Workspace load failed ({} error(s))", load_err.errors.len());
}
}
let sources = match engine_collect_sources(&workspace_paths) {
Ok(s) => s,
Err(e) => {
for err in e.iter() {
tracing::error!("{}", crate::error_formatter::format_error(err, &e.sources));
}
anyhow::bail!("Workspace load failed ({} error(s))", e.errors.len());
}
};
if let Err(load_err) = engine.load_batch(sources, None) {
for err in load_err.iter() {
tracing::error!(
"{}",
crate::error_formatter::format_error(err, &load_err.sources)
);
}
anyhow::bail!("Workspace load failed ({} error(s))", load_err.errors.len());
}
Ok(engine)
}
}