use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::edge_config::EdgeConfig;
use crate::core::ops::cache::CacheConfig;
use crate::core::utils::common::Param;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum OpType {
#[serde(rename = "data")]
Data,
#[serde(rename = "llm")]
Llm,
#[serde(rename = "embedding")]
Embedding,
#[serde(rename = "rerank")]
Rerank,
#[serde(rename = "branch")]
Branch,
#[serde(rename = "for")]
ForLoop,
#[serde(rename = "while")]
WhileLoop,
#[serde(rename = "stream")]
Stream,
#[serde(rename = "code")]
Code,
#[serde(rename = "lambda")]
Lambda,
#[serde(rename = "parser")]
Parser,
#[serde(rename = "prompt")]
Prompt,
#[serde(rename = "doc-processor")]
DocProcessor,
#[serde(rename = "milvus")]
Milvus,
#[serde(rename = "mongo")]
Mongo,
#[serde(rename = "s3")]
S3,
#[serde(rename = "graph")]
Graph,
#[serde(rename = "default")]
Default,
#[serde(rename = "dummy")]
Dummy,
#[serde(rename = "tool-executor")]
ToolExecutor,
#[serde(rename = "mcp")]
Mcp,
#[serde(rename = "triton")]
Triton,
#[serde(rename = "onnx")]
Onnx,
}
impl Default for OpType {
fn default() -> Self {
OpType::Default
}
}
impl OpType {
pub fn as_str(&self) -> &'static str {
match self {
OpType::Data => "data",
OpType::Llm => "llm",
OpType::Embedding => "embedding",
OpType::Rerank => "rerank",
OpType::Branch => "branch",
OpType::ForLoop => "for",
OpType::WhileLoop => "while",
OpType::Stream => "stream",
OpType::Code => "code",
OpType::Lambda => "lambda",
OpType::Parser => "parser",
OpType::Prompt => "prompt",
OpType::DocProcessor => "doc-processor",
OpType::Milvus => "milvus",
OpType::Mongo => "mongo",
OpType::S3 => "s3",
OpType::Graph => "graph",
OpType::Default => "default",
OpType::Dummy => "dummy",
OpType::ToolExecutor => "tool-executor",
OpType::Mcp => "mcp",
OpType::Triton => "triton",
OpType::Onnx => "onnx",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum OpBound {
#[default]
Sync,
Io,
Cpu,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompiledLink {
pub dst: String,
pub soft: bool,
}
impl Serialize for CompiledLink {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeTuple;
let mut tup = serializer.serialize_tuple(2)?;
tup.serialize_element(&self.dst)?;
tup.serialize_element(&self.soft)?;
tup.end()
}
}
impl<'de> Deserialize<'de> for CompiledLink {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let (dst, soft) = <(String, bool)>::deserialize(deserializer)?;
Ok(CompiledLink { dst, soft })
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct LoopConfig {
#[serde(default)]
pub until: Option<String>,
#[serde(default)]
pub max_iterations: Option<u32>,
}
fn true_default() -> bool {
true
}
fn default_concurrency() -> u32 {
64
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct OpConfig {
#[serde(rename = "type")]
pub kind: OpType,
pub name: String,
#[serde(default)]
pub full_name: String,
#[serde(default = "true_default")]
pub enabled: bool,
#[serde(default)]
pub verbose: bool,
#[serde(default)]
pub stream: bool,
#[serde(default)]
pub bound: OpBound,
#[serde(default)]
pub inputs: BTreeMap<String, Param>,
#[serde(default)]
pub outputs: BTreeMap<String, Param>,
#[serde(default)]
pub cache: Option<CacheConfig>,
#[serde(default)]
pub delay: f64,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub func_name: Option<String>,
#[serde(default)]
pub is_async: bool,
#[serde(default)]
pub is_generator: bool,
#[serde(default)]
pub ops: BTreeMap<String, OpConfig>,
#[serde(default)]
pub edges: Vec<EdgeConfig>,
#[serde(default)]
pub entries: Vec<String>,
#[serde(default)]
pub exits: Vec<String>,
#[serde(default)]
pub initial_ready_count: BTreeMap<String, i32>,
#[serde(default)]
pub compiled_adj: BTreeMap<String, Vec<CompiledLink>>,
#[serde(default)]
pub stream_initial_ready: BTreeMap<String, BTreeMap<String, i32>>,
#[serde(default)]
pub loop_config: Option<LoopConfig>,
#[serde(default = "default_concurrency")]
pub max_stream_concurrent: u32,
#[serde(default)]
pub resource: Option<Value>,
#[serde(default)]
pub ratios: Option<Vec<f32>>,
#[serde(default)]
pub fallback: Option<Vec<String>>,
#[serde(default)]
pub batch_mode: bool,
#[serde(default)]
pub resource_config: Option<Value>,
#[serde(default)]
pub resource_configs: Vec<Value>,
#[serde(default)]
pub fallback_configs: Vec<Value>,
#[serde(default)]
pub inputs_map: BTreeMap<String, String>,
#[serde(default)]
pub outputs_map: BTreeMap<String, String>,
#[serde(default, skip_serializing, rename = "python_callable")]
pub python_callable: Option<Value>,
}
impl Default for OpConfig {
fn default() -> Self {
Self {
kind: OpType::default(),
name: String::new(),
full_name: String::new(),
enabled: true,
verbose: false,
stream: false,
bound: OpBound::default(),
inputs: BTreeMap::new(),
outputs: BTreeMap::new(),
cache: None,
delay: 0.0,
description: None,
func_name: None,
is_async: false,
is_generator: false,
ops: BTreeMap::new(),
edges: Vec::new(),
entries: Vec::new(),
exits: Vec::new(),
initial_ready_count: BTreeMap::new(),
compiled_adj: BTreeMap::new(),
stream_initial_ready: BTreeMap::new(),
loop_config: None,
max_stream_concurrent: default_concurrency(),
resource: None,
ratios: None,
fallback: None,
batch_mode: false,
resource_config: None,
resource_configs: Vec::new(),
fallback_configs: Vec::new(),
inputs_map: BTreeMap::new(),
outputs_map: BTreeMap::new(),
python_callable: None,
}
}
}
impl OpConfig {
pub fn resource_keys(&self) -> Vec<String> {
match &self.resource {
None | Some(Value::Null) => Vec::new(),
Some(Value::String(s)) => vec![s.clone()],
Some(Value::Array(arr)) => arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect(),
_ => Vec::new(),
}
}
pub fn is_multi_resource(&self) -> bool {
matches!(&self.resource, Some(Value::Array(_)))
}
}
impl OpConfig {
pub fn is_graph(&self) -> bool {
matches!(self.kind, OpType::Graph)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compiled_link_roundtrips_as_tuple() {
let link = CompiledLink {
dst: "next".into(),
soft: true,
};
let json = serde_json::to_string(&link).unwrap();
assert_eq!(json, r#"["next",true]"#);
let back: CompiledLink = serde_json::from_str(&json).unwrap();
assert_eq!(back, link);
}
#[test]
fn op_config_parses_graph_envelope() {
let src = r#"{
"type": "graph",
"name": "main",
"full_name": "main",
"ops": {},
"edges": [],
"entries": [],
"exits": [],
"initial_ready_count": {},
"compiled_adj": {},
"stream_initial_ready": {}
}"#;
let cfg: OpConfig = serde_json::from_str(src).unwrap();
assert!(cfg.is_graph());
assert_eq!(cfg.name, "main");
assert_eq!(cfg.max_stream_concurrent, 64);
}
#[test]
fn op_config_parses_func_op() {
let src = r#"{
"type": "code",
"name": "double",
"full_name": "main.double",
"func_name": "double",
"is_async": false,
"is_generator": false,
"bound": "sync"
}"#;
let cfg: OpConfig = serde_json::from_str(src).unwrap();
assert!(!cfg.is_graph());
assert_eq!(cfg.kind, OpType::Code);
assert_eq!(cfg.func_name.as_deref(), Some("double"));
assert_eq!(cfg.bound, OpBound::Sync);
}
#[test]
fn python_callable_is_accepted_and_dropped_on_reserialize() {
let src = r#"{
"type": "code",
"name": "f",
"full_name": "main.f",
"python_callable": "<function f at 0x…>"
}"#;
let cfg: OpConfig = serde_json::from_str(src).unwrap();
let out = serde_json::to_string(&cfg).unwrap();
assert!(!out.contains("python_callable"));
}
#[test]
fn loop_config_only_accepts_string_until() {
let src = r#"{"until": "count >= 5", "max_iterations": 10}"#;
let cfg: LoopConfig = serde_json::from_str(src).unwrap();
assert_eq!(cfg.until.as_deref(), Some("count >= 5"));
assert_eq!(cfg.max_iterations, Some(10));
let none_src = r#"{"until": null, "max_iterations": null}"#;
let cfg: LoopConfig = serde_json::from_str(none_src).unwrap();
assert!(cfg.until.is_none());
assert!(cfg.max_iterations.is_none());
}
}