1use crate::Exchange;
2
3#[derive(Debug, Clone, Hash, Eq, PartialEq)]
4pub struct FunctionId(pub String);
5
6impl std::fmt::Display for FunctionId {
7 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8 f.write_str(&self.0)
9 }
10}
11
12impl FunctionId {
13 pub fn compute(runtime: &str, source: &str, timeout_ms_resolved: u64) -> Self {
14 let mut hasher = blake3::Hasher::new();
15 let rlen = (runtime.len() as u64).to_le_bytes();
16 hasher.update(&rlen);
17 hasher.update(runtime.as_bytes());
18 let slen = (source.len() as u64).to_le_bytes();
19 hasher.update(&slen);
20 hasher.update(source.as_bytes());
21 hasher.update(&timeout_ms_resolved.to_le_bytes());
22 let hash = hasher.finalize();
23 let truncated = &hash.as_bytes()[..16];
24 let hex: String = truncated.iter().map(|b| format!("{:02x}", b)).collect();
25 Self(hex)
26 }
27}
28
29#[derive(Debug, Clone)]
30pub struct FunctionDefinition {
31 pub id: FunctionId,
32 pub runtime: String,
33 pub source: String,
34 pub timeout_ms: u64,
35 pub route_id: Option<String>,
36 pub step_index: Option<usize>,
37}
38
39#[derive(Debug, Clone, Default)]
40pub struct ExchangePatch {
41 pub body: Option<PatchBody>,
42 pub headers_set: Vec<(String, serde_json::Value)>,
43 pub headers_removed: Vec<String>,
44 pub properties_set: Vec<(String, serde_json::Value)>,
45}
46
47#[derive(Debug, Clone)]
48pub enum PatchBody {
49 Text(String),
50 Json(serde_json::Value),
51 Empty,
52}
53
54#[derive(Debug, thiserror::Error)]
55pub enum FunctionInvocationError {
56 #[error("function {function_id} not registered on runtime")]
57 NotRegistered { function_id: FunctionId },
58 #[error("function {function_id} timed out after {timeout_ms}ms")]
59 Timeout {
60 function_id: FunctionId,
61 timeout_ms: u64,
62 },
63 #[error("runner unavailable: {reason}")]
64 RunnerUnavailable { reason: String },
65 #[error("user code failed: {message}")]
66 UserError {
67 function_id: FunctionId,
68 message: String,
69 stack: Option<String>,
70 },
71 #[error("transport error: {0}")]
72 Transport(String),
73 #[error("invalid patch: {0}")]
74 InvalidPatch(String),
75}
76
77#[derive(Debug, Default, Clone)]
78pub struct FunctionDiff {
79 pub added: Vec<(FunctionDefinition, Option<String>)>,
80 pub removed: Vec<(FunctionId, Option<String>)>,
81 pub unchanged: Vec<FunctionId>,
82}
83
84#[derive(Debug, Default, Clone)]
85pub struct PrepareToken {
86 pub registered: Vec<(FunctionDefinition, Option<String>)>,
87}
88
89pub trait FunctionInvokerSync: Send + Sync {
90 fn stage_pending(&self, def: FunctionDefinition, route_id: Option<&str>, generation: u64);
91 fn discard_staging(&self, generation: u64);
92 fn begin_reload(&self) -> u64;
93 fn function_refs_for_route(&self, route_id: &str) -> Vec<(FunctionId, Option<String>)>;
94 fn staged_refs_for_route(
95 &self,
96 route_id: &str,
97 generation: u64,
98 ) -> Vec<(FunctionId, Option<String>)>;
99 fn staged_defs_for_route(
100 &self,
101 route_id: &str,
102 generation: u64,
103 ) -> Vec<(FunctionDefinition, Option<String>)>;
104}
105
106#[async_trait::async_trait]
107pub trait FunctionInvoker: FunctionInvokerSync + Send + Sync {
108 async fn register(
109 &self,
110 def: FunctionDefinition,
111 route_id: Option<&str>,
112 ) -> Result<(), FunctionInvocationError>;
113 async fn unregister(
114 &self,
115 id: &FunctionId,
116 route_id: Option<&str>,
117 ) -> Result<(), FunctionInvocationError>;
118 async fn invoke(
119 &self,
120 id: &FunctionId,
121 exchange: &Exchange,
122 ) -> Result<ExchangePatch, FunctionInvocationError>;
123 async fn prepare_reload(
124 &self,
125 diff: FunctionDiff,
126 generation: u64,
127 ) -> Result<PrepareToken, FunctionInvocationError>;
128 async fn finalize_reload(
129 &self,
130 diff: &FunctionDiff,
131 generation: u64,
132 ) -> Result<(), FunctionInvocationError>;
133 async fn rollback_reload(
134 &self,
135 token: PrepareToken,
136 generation: u64,
137 ) -> Result<(), FunctionInvocationError>;
138 async fn commit_reload(
139 &self,
140 diff: FunctionDiff,
141 generation: u64,
142 ) -> Result<(), FunctionInvocationError> {
143 let _token = self.prepare_reload(diff.clone(), generation).await?;
144 self.finalize_reload(&diff, generation).await
145 }
146 async fn commit_staged(&self) -> Result<(), FunctionInvocationError>;
147}