Skip to main content

camel_api/
function.rs

1use crate::Exchange;
2
3#[derive(Debug, Clone, Hash, Eq, PartialEq)]
4pub struct FunctionId(pub String);
5
6impl std::fmt::Display for FunctionId {
7    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8        f.write_str(&self.0)
9    }
10}
11
12impl FunctionId {
13    pub fn compute(runtime: &str, source: &str, timeout_ms_resolved: u64) -> Self {
14        let mut hasher = blake3::Hasher::new();
15        let rlen = (runtime.len() as u64).to_le_bytes();
16        hasher.update(&rlen);
17        hasher.update(runtime.as_bytes());
18        let slen = (source.len() as u64).to_le_bytes();
19        hasher.update(&slen);
20        hasher.update(source.as_bytes());
21        hasher.update(&timeout_ms_resolved.to_le_bytes());
22        let hash = hasher.finalize();
23        let truncated = &hash.as_bytes()[..16];
24        let hex: String = truncated.iter().map(|b| format!("{:02x}", b)).collect();
25        Self(hex)
26    }
27}
28
29#[derive(Debug, Clone)]
30pub struct FunctionDefinition {
31    pub id: FunctionId,
32    pub runtime: String,
33    pub source: String,
34    pub timeout_ms: u64,
35    pub route_id: Option<String>,
36    pub step_index: Option<usize>,
37}
38
39#[derive(Debug, Clone, Default)]
40pub struct ExchangePatch {
41    pub body: Option<PatchBody>,
42    pub headers_set: Vec<(String, serde_json::Value)>,
43    pub headers_removed: Vec<String>,
44    pub properties_set: Vec<(String, serde_json::Value)>,
45}
46
47#[derive(Debug, Clone)]
48pub enum PatchBody {
49    Text(String),
50    Json(serde_json::Value),
51    Empty,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum FunctionInvocationError {
56    #[error("function {function_id} not registered on runtime")]
57    NotRegistered { function_id: FunctionId },
58    #[error("function {function_id} timed out after {timeout_ms}ms")]
59    Timeout {
60        function_id: FunctionId,
61        timeout_ms: u64,
62    },
63    #[error("runner unavailable: {reason}")]
64    RunnerUnavailable { reason: String },
65    #[error("user code failed: {message}")]
66    UserError {
67        function_id: FunctionId,
68        message: String,
69        stack: Option<String>,
70    },
71    #[error("transport error: {0}")]
72    Transport(String),
73    #[error("invalid patch: {0}")]
74    InvalidPatch(String),
75}
76
77#[derive(Debug, Default, Clone)]
78pub struct FunctionDiff {
79    pub added: Vec<(FunctionDefinition, Option<String>)>,
80    pub removed: Vec<(FunctionId, Option<String>)>,
81    pub unchanged: Vec<FunctionId>,
82}
83
84#[derive(Debug, Default, Clone)]
85pub struct PrepareToken {
86    pub registered: Vec<(FunctionDefinition, Option<String>)>,
87}
88
89pub trait FunctionInvokerSync: Send + Sync {
90    fn stage_pending(&self, def: FunctionDefinition, route_id: Option<&str>, generation: u64);
91    fn discard_staging(&self, generation: u64);
92    fn begin_reload(&self) -> u64;
93    fn function_refs_for_route(&self, route_id: &str) -> Vec<(FunctionId, Option<String>)>;
94    fn staged_refs_for_route(
95        &self,
96        route_id: &str,
97        generation: u64,
98    ) -> Vec<(FunctionId, Option<String>)>;
99    fn staged_defs_for_route(
100        &self,
101        route_id: &str,
102        generation: u64,
103    ) -> Vec<(FunctionDefinition, Option<String>)>;
104}
105
106#[async_trait::async_trait]
107pub trait FunctionInvoker: FunctionInvokerSync + Send + Sync {
108    async fn register(
109        &self,
110        def: FunctionDefinition,
111        route_id: Option<&str>,
112    ) -> Result<(), FunctionInvocationError>;
113    async fn unregister(
114        &self,
115        id: &FunctionId,
116        route_id: Option<&str>,
117    ) -> Result<(), FunctionInvocationError>;
118    async fn invoke(
119        &self,
120        id: &FunctionId,
121        exchange: &Exchange,
122    ) -> Result<ExchangePatch, FunctionInvocationError>;
123    async fn prepare_reload(
124        &self,
125        diff: FunctionDiff,
126        generation: u64,
127    ) -> Result<PrepareToken, FunctionInvocationError>;
128    async fn finalize_reload(
129        &self,
130        diff: &FunctionDiff,
131        generation: u64,
132    ) -> Result<(), FunctionInvocationError>;
133    async fn rollback_reload(
134        &self,
135        token: PrepareToken,
136        generation: u64,
137    ) -> Result<(), FunctionInvocationError>;
138    async fn commit_reload(
139        &self,
140        diff: FunctionDiff,
141        generation: u64,
142    ) -> Result<(), FunctionInvocationError> {
143        let _token = self.prepare_reload(diff.clone(), generation).await?;
144        self.finalize_reload(&diff, generation).await
145    }
146    async fn commit_staged(&self) -> Result<(), FunctionInvocationError>;
147}