atomfn 0.1.0

AtomService 函数服务 Rust SDK:与 TS SDK 协议一致的常驻 HTTP 运行时
Documentation
use std::collections::HashMap;

use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;

use crate::context::{Ctx, FunctionError};

/// 函数执行结果(已脱离用户类型,统一为 JSON 值)。
pub enum Outcome {
    Ok(Value),
    Business { code: String, data: Value },
    Validation(String),
    Crash(String),
}

fn map_error(error: FunctionError) -> Outcome {
    match error {
        FunctionError::Validation(message) => Outcome::Validation(message),
        FunctionError::Business { code, data } => Outcome::Business { code, data },
        FunctionError::Crash(message) => Outcome::Crash(message),
    }
}

pub type StreamItems = Box<dyn Iterator<Item = Value> + Send>;

type OneShotFn = Box<dyn Fn(Value, &Ctx) -> Outcome + Send + Sync>;
type StreamFn = Box<dyn Fn(Value, &Ctx) -> Result<StreamItems, Outcome> + Send + Sync>;

/// 函数注册表。通过 `function` / `stream_function` 注册处理器。
#[derive(Default)]
pub struct Registry {
    pub(crate) one_shot: HashMap<String, OneShotFn>,
    pub(crate) stream: HashMap<String, StreamFn>,
}

impl Registry {
    pub fn new() -> Self {
        Registry::default()
    }

    /// 注册同步一次性函数。
    pub fn function<I, O, F>(&mut self, name: &str, handler: F) -> &mut Self
    where
        I: DeserializeOwned,
        O: Serialize,
        F: Fn(I, &Ctx) -> Result<O, FunctionError> + Send + Sync + 'static,
    {
        let wrapped: OneShotFn = Box::new(move |value, ctx| {
            let input: I = match serde_json::from_value(value) {
                Ok(parsed) => parsed,
                Err(error) => return Outcome::Validation(error.to_string()),
            };
            match handler(input, ctx) {
                Ok(output) => match serde_json::to_value(output) {
                    Ok(value) => Outcome::Ok(value),
                    Err(error) => Outcome::Crash(error.to_string()),
                },
                Err(error) => map_error(error),
            }
        });
        self.one_shot.insert(name.to_string(), wrapped);
        self
    }

    /// 注册流式函数(处理器返回逐块产出的迭代器)。
    pub fn stream_function<I, O, IT, F>(&mut self, name: &str, handler: F) -> &mut Self
    where
        I: DeserializeOwned,
        O: Serialize + 'static,
        IT: Iterator<Item = O> + Send + 'static,
        F: Fn(I, &Ctx) -> Result<IT, FunctionError> + Send + Sync + 'static,
    {
        let wrapped: StreamFn = Box::new(move |value, ctx| {
            let input: I = serde_json::from_value(value)
                .map_err(|error| Outcome::Validation(error.to_string()))?;
            let iterator = handler(input, ctx).map_err(map_error)?;
            let mapped = iterator.map(|item| serde_json::to_value(item).unwrap_or(Value::Null));
            Ok(Box::new(mapped) as StreamItems)
        });
        self.stream.insert(name.to_string(), wrapped);
        self
    }

    pub fn names(&self) -> Vec<String> {
        let mut names: Vec<String> = self
            .one_shot
            .keys()
            .chain(self.stream.keys())
            .cloned()
            .collect();
        names.sort();
        names
    }

    /// 返回函数列表,格式为 `[{name, kind}]`,与 Bun SDK 一致。
    pub fn functions(&self) -> Vec<serde_json::Value> {
        let mut list: Vec<serde_json::Value> = Vec::new();
        for name in self.one_shot.keys() {
            list.push(serde_json::json!({ "name": name, "kind": "one_shot" }));
        }
        for name in self.stream.keys() {
            list.push(serde_json::json!({ "name": name, "kind": "stream" }));
        }
        list.sort_by(|a, b| a["name"].as_str().cmp(&b["name"].as_str()));
        list
    }
}