atomfn 0.1.4

AtomService 函数服务 Rust SDK:与 TS SDK 协议一致的常驻 HTTP 运行时
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use serde::Deserialize;
use serde_json::{json, Value};
use tiny_http::{Header, Method, Response, Server};

use crate::context::{generate_request_id, Ctx};
use crate::registry::{FunctionConfig, Outcome, Registry};

const DEFAULT_PORT: u16 = 8080;
const REQUEST_ID_HEADER: &str = "x-atomfn-request-id";

struct ServerState {
    registry: Registry,
    project: String,
    bundle: String,
}

#[derive(Deserialize)]
struct ConfigFile {
    #[serde(default)]
    functions: HashMap<String, FunctionConfig>,
}

fn load_config_file() -> HashMap<String, FunctionConfig> {
    let path = std::env::var("ATOMFN_CONFIG")
        .unwrap_or_else(|_| "/app/functions.config.json".to_string());
    match std::fs::read_to_string(&path) {
        Ok(content) => match serde_json::from_str::<ConfigFile>(&content) {
            Ok(cfg) => cfg.functions,
            Err(e) => {
                eprintln!("警告:配置文件解析失败 {}: {}", path, e);
                HashMap::new()
            }
        },
        Err(_) => HashMap::new(),
    }
}

pub fn serve(mut registry: Registry) {
    let port: u16 = std::env::var("ATOMFN_PORT")
        .ok()
        .and_then(|value| value.parse().ok())
        .unwrap_or(DEFAULT_PORT);
    let project = std::env::var("ATOMFN_PROJECT").unwrap_or_else(|_| "default".to_string());
    let bundle = std::env::var("ATOMFN_BUNDLE").unwrap_or_else(|_| "default".to_string());

    registry.load_configs(load_config_file());

    let state = Arc::new(ServerState {
        registry,
        project,
        bundle,
    });

    let server = Server::http(format!("0.0.0.0:{}", port)).expect("无法绑定端口");
    println!("atomfn 运行时已启动 :{}", port);

    for request in server.incoming_requests() {
        let state = Arc::clone(&state);
        std::thread::spawn(move || {
            let _ = handle_request(request, &state);
        });
    }
}

fn handle_request(
    request: tiny_http::Request,
    state: &ServerState,
) -> Result<(), Box<dyn std::error::Error>> {
    let url = request.url().to_string();
    let (path, query) = match url.find('?') {
        Some(idx) => (url[..idx].to_string(), Some(url[idx + 1..].to_string())),
        None => (url.clone(), None),
    };

    let request_id = request
        .headers()
        .iter()
        .find(|h| h.field.as_str().to_ascii_lowercase() == REQUEST_ID_HEADER)
        .map(|h| h.value.to_string());

    match (request.method().clone(), path.as_str()) {
        (Method::Get, "/__health") => {
            let body = json!({ "ok": true, "functions": state.registry.functions() });
            let response = json_response(200, &body, request_id.as_deref());
            Ok(request.respond(response)?)
        }
        (Method::Post, p) if p.starts_with("/invoke/") => {
            let name = p["/invoke/".len()..].to_string();
            handle_invoke(request, state, &name, query.as_deref(), request_id)
        }
        _ => {
            let response = Response::from_string("not found").with_status_code(404);
            Ok(request.respond(response)?)
        }
    }
}

fn handle_invoke(
    mut request: tiny_http::Request,
    state: &ServerState,
    name: &str,
    query: Option<&str>,
    request_id: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut event = json!({});

    if let Some(query) = query {
        for pair in query.split('&') {
            if let Some((k, v)) = pair.split_once('=') {
                let key = urldecode(k);
                let val = urldecode(v);
                event[key] = Value::String(val);
            }
        }
    }

    let mut body = Vec::new();
    request.as_reader().read_to_end(&mut body)?;
    if !body.is_empty() {
        if let Ok(Value::Object(body_map)) = serde_json::from_slice::<Value>(&body) {
            if let Value::Object(ref mut event_map) = event {
                for (k, v) in body_map {
                    event_map.insert(k.clone(), v.clone());
                }
            }
        }
    }

    let request_id = request_id.unwrap_or_else(generate_request_id);
    let ctx = Ctx {
        request_id: request_id.clone(),
        project: state.project.clone(),
        bundle: state.bundle.clone(),
        function: name.to_string(),
    };

    if let Some(handler) = state.registry.one_shot.get(name) {
        let outcome = handler(event, &ctx);
        let (status, body) = map_outcome(outcome);
        let response = json_response(status, &body, Some(&request_id));
        return Ok(request.respond(response)?);
    }

    if let Some(handler) = state.registry.stream.get(name) {
        let response = build_sse_response(handler(event, &ctx), &request_id);
        return Ok(request.respond(response)?);
    }

    let body = system_error("NOT_FOUND", &format!("未找到函数 {}", name), false);
    let response = json_response(404, &body, Some(&request_id));
    Ok(request.respond(response)?)
}

fn map_outcome(outcome: Outcome) -> (u16, Value) {
    match outcome {
        Outcome::Ok(data) => (200, json!({ "ok": true, "data": data })),
        Outcome::Business { code, data } => (
            200,
            json!({ "ok": false, "error": { "type": "business", "code": code, "data": data } }),
        ),
        Outcome::Validation(message) => (422, system_error("VALIDATION_FAILED", &message, false)),
        Outcome::Crash(message) => (500, system_error("FUNCTION_ERROR", &message, false)),
    }
}

fn system_error(code: &str, message: &str, retryable: bool) -> Value {
    json!({
        "ok": false,
        "error": { "type": "system", "code": code, "message": message, "retryable": retryable }
    })
}

fn json_response(status: u16, body: &Value, request_id: Option<&str>) -> Response<std::io::Cursor<Vec<u8>>> {
    let payload = body.to_string();
    let mut response = Response::from_data(payload.into_bytes()).with_status_code(status);
    response.add_header(Header::from_bytes(b"Content-Type", b"application/json").unwrap());
    if let Some(id) = request_id {
        response.add_header(Header::from_bytes(REQUEST_ID_HEADER, id).unwrap());
    }
    response
}

fn build_sse_response(
    result: Result<crate::registry::StreamItems, Outcome>,
    request_id: &str,
) -> Response<std::io::Cursor<Vec<u8>>> {
    let mut buf = Vec::new();

    match result {
        Err(Outcome::Validation(message)) => {
            let body = system_error("VALIDATION_FAILED", &message, false);
            return json_response(422, &body, Some(request_id));
        }
        Err(outcome) => {
            let frame = format!("event: error\ndata: {}\n\n", outcome_error_value(outcome));
            buf.extend_from_slice(frame.as_bytes());
        }
        Ok(items) => {
            for item in items {
                let frame = format!("event: chunk\ndata: {}\n\n", item);
                buf.extend_from_slice(frame.as_bytes());
            }
            let frame = format!("event: done\ndata: {}\n\n", json!({}));
            buf.extend_from_slice(frame.as_bytes());
        }
    }

    let mut response = Response::from_data(buf);
    response.add_header(Header::from_bytes(b"Content-Type", b"text/event-stream").unwrap());
    response.add_header(Header::from_bytes(b"Cache-Control", b"no-cache").unwrap());
    response.add_header(Header::from_bytes(REQUEST_ID_HEADER, request_id).unwrap());
    response
}

fn outcome_error_value(outcome: Outcome) -> Value {
    match outcome {
        Outcome::Business { code, data } => {
            json!({ "type": "business", "code": code, "data": data })
        }
        Outcome::Crash(message) => {
            json!({ "type": "system", "code": "FUNCTION_ERROR", "message": message, "retryable": false })
        }
        Outcome::Validation(message) => {
            json!({ "type": "system", "code": "VALIDATION_FAILED", "message": message, "retryable": false })
        }
        Outcome::Ok(_) => {
            json!({ "type": "system", "code": "FUNCTION_ERROR", "message": "", "retryable": false })
        }
    }
}

fn urldecode(s: &str) -> String {
    let mut result = String::new();
    let mut chars = s.chars();
    while let Some(c) = chars.next() {
        if c == '%' {
            let hex: String = chars.by_ref().take(2).collect();
            if hex.len() == 2 {
                if let Ok(byte) = u8::from_str_radix(&hex, 16) {
                    result.push(byte as char);
                    continue;
                }
            }
            result.push('%');
            result.push_str(&hex);
        } else if c == '+' {
            result.push(' ');
        } else {
            result.push(c);
        }
    }
    result
}