atomfn 0.1.4

AtomService 函数服务 Rust SDK:与 TS SDK 协议一致的常驻 HTTP 运行时
Documentation
use std::collections::HashMap;

use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use validator::Validate;

use crate::context::{Ctx, FunctionError};

#[derive(Debug, Clone, Default, Deserialize)]
pub struct FunctionConfig {
    pub timeout: Option<u64>,
    pub max_concurrency: Option<u32>,
}

pub enum Outcome {
    Ok(Value),
    Business { code: String, data: Value },
    Validation(String),
    Crash(String),
}

fn map_error(error: FunctionError) -> Outcome {
    match error {
        FunctionError::Validation(message) => Outcome::Validation(message),
        FunctionError::Business { code, data } => Outcome::Business { code, data },
        FunctionError::Crash(message) => Outcome::Crash(message),
    }
}

pub type StreamItems = Box<dyn Iterator<Item = Value> + Send>;

type OneShotFn = Box<dyn Fn(Value, &Ctx) -> Outcome + Send + Sync>;
type StreamFn = Box<dyn Fn(Value, &Ctx) -> Result<StreamItems, Outcome> + Send + Sync>;

#[derive(Default)]
pub struct Registry {
    pub(crate) one_shot: HashMap<String, OneShotFn>,
    pub(crate) stream: HashMap<String, StreamFn>,
    pub(crate) configs: HashMap<String, FunctionConfig>,
}

impl Registry {
    pub fn new() -> Self {
        Registry::default()
    }

    pub fn function<I, O, F>(&mut self, name: &str, handler: F) -> &mut Self
    where
        I: DeserializeOwned + Validate,
        O: Serialize,
        F: Fn(I, &Ctx) -> Result<O, FunctionError> + Send + Sync + 'static,
    {
        let wrapped: OneShotFn = Box::new(move |value, ctx| {
            let input: I = match serde_json::from_value(value) {
                Ok(parsed) => parsed,
                Err(error) => return Outcome::Validation(error.to_string()),
            };
            if let Err(errors) = input.validate() {
                return Outcome::Validation(errors.to_string());
            }
            match handler(input, ctx) {
                Ok(output) => match serde_json::to_value(output) {
                    Ok(value) => Outcome::Ok(value),
                    Err(error) => Outcome::Crash(error.to_string()),
                },
                Err(error) => map_error(error),
            }
        });
        self.one_shot.insert(name.to_string(), wrapped);
        self
    }

    pub fn stream_function<I, O, IT, F>(&mut self, name: &str, handler: F) -> &mut Self
    where
        I: DeserializeOwned + Validate,
        O: Serialize + 'static,
        IT: Iterator<Item = O> + Send + 'static,
        F: Fn(I, &Ctx) -> Result<IT, FunctionError> + Send + Sync + 'static,
    {
        let wrapped: StreamFn = Box::new(move |value, ctx| {
            let input: I = serde_json::from_value(value)
                .map_err(|error| Outcome::Validation(error.to_string()))?;
            if let Err(errors) = input.validate() {
                return Err(Outcome::Validation(errors.to_string()));
            }
            let iterator = handler(input, ctx).map_err(map_error)?;
            let mapped = iterator.map(|item| serde_json::to_value(item).unwrap_or(Value::Null));
            Ok(Box::new(mapped) as StreamItems)
        });
        self.stream.insert(name.to_string(), wrapped);
        self
    }

    pub fn names(&self) -> Vec<String> {
        let mut names: Vec<String> = self
            .one_shot
            .keys()
            .chain(self.stream.keys())
            .cloned()
            .collect();
        names.sort();
        names
    }

    pub fn functions(&self) -> Vec<serde_json::Value> {
        let mut list: Vec<serde_json::Value> = Vec::new();
        for name in self.one_shot.keys() {
            list.push(serde_json::json!({ "name": name, "kind": "function" }));
        }
        for name in self.stream.keys() {
            list.push(serde_json::json!({ "name": name, "kind": "streamFunction" }));
        }
        list.sort_by(|a, b| a["name"].as_str().cmp(&b["name"].as_str()));
        list
    }

    pub fn load_configs(&mut self, configs: HashMap<String, FunctionConfig>) {
        self.configs = configs;
    }

    pub fn config(&self, name: &str) -> Option<&FunctionConfig> {
        self.configs.get(name)
    }
}