use std::collections::HashMap;
use std::sync::Arc;
use serde::Deserialize;
use serde_json::{json, Value};
use tiny_http::{Header, Method, Response, Server};
use crate::context::{generate_request_id, Ctx};
use crate::registry::{FunctionConfig, Outcome, Registry};
const DEFAULT_PORT: u16 = 8080;
const REQUEST_ID_HEADER: &str = "x-atomfn-request-id";
struct ServerState {
registry: Registry,
project: String,
bundle: String,
}
#[derive(Deserialize)]
struct ConfigFile {
#[serde(default)]
functions: HashMap<String, FunctionConfig>,
}
fn load_config_file() -> HashMap<String, FunctionConfig> {
let path = std::env::var("ATOMFN_CONFIG")
.unwrap_or_else(|_| "/app/functions.config.json".to_string());
match std::fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str::<ConfigFile>(&content) {
Ok(cfg) => cfg.functions,
Err(e) => {
eprintln!("警告:配置文件解析失败 {}: {}", path, e);
HashMap::new()
}
},
Err(_) => HashMap::new(),
}
}
pub fn serve(mut registry: Registry) {
let port: u16 = std::env::var("ATOMFN_PORT")
.ok()
.and_then(|value| value.parse().ok())
.unwrap_or(DEFAULT_PORT);
let project = std::env::var("ATOMFN_PROJECT").unwrap_or_else(|_| "default".to_string());
let bundle = std::env::var("ATOMFN_BUNDLE").unwrap_or_else(|_| "default".to_string());
registry.load_configs(load_config_file());
let state = Arc::new(ServerState {
registry,
project,
bundle,
});
let server = Server::http(format!("0.0.0.0:{}", port)).expect("无法绑定端口");
println!("atomfn 运行时已启动 :{}", port);
for request in server.incoming_requests() {
let state = Arc::clone(&state);
std::thread::spawn(move || {
let _ = handle_request(request, &state);
});
}
}
fn handle_request(
request: tiny_http::Request,
state: &ServerState,
) -> Result<(), Box<dyn std::error::Error>> {
let url = request.url().to_string();
let (path, query) = match url.find('?') {
Some(idx) => (url[..idx].to_string(), Some(url[idx + 1..].to_string())),
None => (url.clone(), None),
};
let request_id = request
.headers()
.iter()
.find(|h| h.field.as_str().to_ascii_lowercase() == REQUEST_ID_HEADER)
.map(|h| h.value.to_string());
match (request.method().clone(), path.as_str()) {
(Method::Get, "/__health") => {
let body = json!({ "ok": true, "functions": state.registry.functions() });
let response = json_response(200, &body, request_id.as_deref());
Ok(request.respond(response)?)
}
(Method::Post, p) if p.starts_with("/invoke/") => {
let name = p["/invoke/".len()..].to_string();
handle_invoke(request, state, &name, query.as_deref(), request_id)
}
_ => {
let response = Response::from_string("not found").with_status_code(404);
Ok(request.respond(response)?)
}
}
}
fn handle_invoke(
mut request: tiny_http::Request,
state: &ServerState,
name: &str,
query: Option<&str>,
request_id: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut event = json!({});
if let Some(query) = query {
for pair in query.split('&') {
if let Some((k, v)) = pair.split_once('=') {
let key = urldecode(k);
let val = urldecode(v);
event[key] = Value::String(val);
}
}
}
let mut body = Vec::new();
request.as_reader().read_to_end(&mut body)?;
if !body.is_empty() {
if let Ok(Value::Object(body_map)) = serde_json::from_slice::<Value>(&body) {
if let Value::Object(ref mut event_map) = event {
for (k, v) in body_map {
event_map.insert(k.clone(), v.clone());
}
}
}
}
let request_id = request_id.unwrap_or_else(generate_request_id);
let ctx = Ctx {
request_id: request_id.clone(),
project: state.project.clone(),
bundle: state.bundle.clone(),
function: name.to_string(),
};
if let Some(handler) = state.registry.one_shot.get(name) {
let outcome = handler(event, &ctx);
let (status, body) = map_outcome(outcome);
let response = json_response(status, &body, Some(&request_id));
return Ok(request.respond(response)?);
}
if let Some(handler) = state.registry.stream.get(name) {
let response = build_sse_response(handler(event, &ctx), &request_id);
return Ok(request.respond(response)?);
}
let body = system_error("NOT_FOUND", &format!("未找到函数 {}", name), false);
let response = json_response(404, &body, Some(&request_id));
Ok(request.respond(response)?)
}
fn map_outcome(outcome: Outcome) -> (u16, Value) {
match outcome {
Outcome::Ok(data) => (200, json!({ "ok": true, "data": data })),
Outcome::Business { code, data } => (
200,
json!({ "ok": false, "error": { "type": "business", "code": code, "data": data } }),
),
Outcome::Validation(message) => (422, system_error("VALIDATION_FAILED", &message, false)),
Outcome::Crash(message) => (500, system_error("FUNCTION_ERROR", &message, false)),
}
}
fn system_error(code: &str, message: &str, retryable: bool) -> Value {
json!({
"ok": false,
"error": { "type": "system", "code": code, "message": message, "retryable": retryable }
})
}
fn json_response(status: u16, body: &Value, request_id: Option<&str>) -> Response<std::io::Cursor<Vec<u8>>> {
let payload = body.to_string();
let mut response = Response::from_data(payload.into_bytes()).with_status_code(status);
response.add_header(Header::from_bytes(b"Content-Type", b"application/json").unwrap());
if let Some(id) = request_id {
response.add_header(Header::from_bytes(REQUEST_ID_HEADER, id).unwrap());
}
response
}
fn build_sse_response(
result: Result<crate::registry::StreamItems, Outcome>,
request_id: &str,
) -> Response<std::io::Cursor<Vec<u8>>> {
let mut buf = Vec::new();
match result {
Err(Outcome::Validation(message)) => {
let body = system_error("VALIDATION_FAILED", &message, false);
return json_response(422, &body, Some(request_id));
}
Err(outcome) => {
let frame = format!("event: error\ndata: {}\n\n", outcome_error_value(outcome));
buf.extend_from_slice(frame.as_bytes());
}
Ok(items) => {
for item in items {
let frame = format!("event: chunk\ndata: {}\n\n", item);
buf.extend_from_slice(frame.as_bytes());
}
let frame = format!("event: done\ndata: {}\n\n", json!({}));
buf.extend_from_slice(frame.as_bytes());
}
}
let mut response = Response::from_data(buf);
response.add_header(Header::from_bytes(b"Content-Type", b"text/event-stream").unwrap());
response.add_header(Header::from_bytes(b"Cache-Control", b"no-cache").unwrap());
response.add_header(Header::from_bytes(REQUEST_ID_HEADER, request_id).unwrap());
response
}
fn outcome_error_value(outcome: Outcome) -> Value {
match outcome {
Outcome::Business { code, data } => {
json!({ "type": "business", "code": code, "data": data })
}
Outcome::Crash(message) => {
json!({ "type": "system", "code": "FUNCTION_ERROR", "message": message, "retryable": false })
}
Outcome::Validation(message) => {
json!({ "type": "system", "code": "VALIDATION_FAILED", "message": message, "retryable": false })
}
Outcome::Ok(_) => {
json!({ "type": "system", "code": "FUNCTION_ERROR", "message": "", "retryable": false })
}
}
}
fn urldecode(s: &str) -> String {
let mut result = String::new();
let mut chars = s.chars();
while let Some(c) = chars.next() {
if c == '%' {
let hex: String = chars.by_ref().take(2).collect();
if hex.len() == 2 {
if let Ok(byte) = u8::from_str_radix(&hex, 16) {
result.push(byte as char);
continue;
}
}
result.push('%');
result.push_str(&hex);
} else if c == '+' {
result.push(' ');
} else {
result.push(c);
}
}
result
}