1use async_trait::async_trait;
2
3use rustvello_proto::call::CallDTO;
4use rustvello_proto::identifiers::{CallId, InvocationId};
5use rustvello_proto::invocation::{InvocationDTO, InvocationHistory};
6
7use crate::context::RunnerContext;
8
9use crate::error::{RustvelloResult, TaskError};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub struct StoredRunnerContext {
21 pub runner_cls: String,
23 pub runner_id: String,
25 pub pid: u32,
26 pub hostname: String,
27 pub thread_id: u64,
28 pub started_at: chrono::DateTime<chrono::Utc>,
29 pub parent_runner_id: Option<String>,
31 pub parent_runner_cls: Option<String>,
33}
34
35impl StoredRunnerContext {
36 pub fn current(runner_id: impl Into<String>, runner_cls: impl Into<String>) -> Self {
37 let thread_id = format!("{:?}", std::thread::current().id());
38 let thread_num: u64 = thread_id
39 .trim_start_matches("ThreadId(")
40 .trim_end_matches(')')
41 .parse()
42 .unwrap_or(0);
43 Self {
44 runner_cls: runner_cls.into(),
45 runner_id: runner_id.into(),
46 pid: std::process::id(),
47 hostname: hostname::get().map_or_else(
48 |_| "unknown".to_string(),
49 |h| h.to_string_lossy().into_owned(),
50 ),
51 thread_id: thread_num,
52 started_at: chrono::Utc::now(),
53 parent_runner_id: None,
54 parent_runner_cls: None,
55 }
56 }
57
58 pub fn new_child(
60 &self,
61 child_runner_id: impl Into<String>,
62 child_runner_cls: impl Into<String>,
63 ) -> Self {
64 let mut child = Self::current(child_runner_id, child_runner_cls);
65 child.parent_runner_id = Some(self.runner_id.clone());
66 child.parent_runner_cls = Some(self.runner_cls.clone());
67 child
68 }
69
70 pub fn from_runtime(ctx: &RunnerContext) -> Self {
75 Self {
76 runner_cls: ctx.runner_cls.as_ref().to_string(),
77 runner_id: ctx.runner_id.to_string(),
78 pid: ctx.pid,
79 hostname: ctx.hostname.clone(),
80 thread_id: ctx.thread_id,
81 started_at: chrono::Utc::now(),
82 parent_runner_id: ctx.parent_ctx.as_ref().map(|p| p.runner_id.to_string()),
83 parent_runner_cls: ctx
84 .parent_ctx
85 .as_ref()
86 .map(|p| p.runner_cls.as_ref().to_string()),
87 }
88 }
89}
90
91pub trait StateBackend: StateBackendCore + StateBackendQuery + StateBackendRunner {}
101
102impl<T: StateBackendCore + StateBackendQuery + StateBackendRunner> StateBackend for T {}
103
104#[async_trait]
112pub trait StateBackendCore: Send + Sync {
113 async fn upsert_invocation(
117 &self,
118 invocation: &InvocationDTO,
119 call: &CallDTO,
120 ) -> RustvelloResult<()>;
121
122 async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO>;
124
125 async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO>;
127
128 async fn store_result(&self, invocation_id: &InvocationId, result: &str)
132 -> RustvelloResult<()>;
133
134 async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>>;
136
137 async fn store_error(
139 &self,
140 invocation_id: &InvocationId,
141 error: &TaskError,
142 ) -> RustvelloResult<()>;
143
144 async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>>;
146
147 async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()>;
151
152 async fn get_history(
154 &self,
155 invocation_id: &InvocationId,
156 ) -> RustvelloResult<Vec<InvocationHistory>>;
157
158 async fn purge(&self) -> RustvelloResult<()>;
162
163 fn backend_name(&self) -> &'static str {
165 "Unknown"
166 }
167
168 async fn usage_stats(&self) -> Vec<(&'static str, String)> {
170 Vec::new()
171 }
172}
173
174#[async_trait]
180pub trait StateBackendQuery: Send + Sync {
181 async fn get_workflow_invocations(
185 &self,
186 workflow_id: &InvocationId,
187 ) -> RustvelloResult<Vec<InvocationId>>;
188
189 async fn get_child_invocations(
191 &self,
192 parent_invocation_id: &InvocationId,
193 ) -> RustvelloResult<Vec<InvocationId>>;
194
195 async fn store_workflow_run(
199 &self,
200 workflow: &rustvello_proto::invocation::WorkflowIdentity,
201 ) -> RustvelloResult<()>;
202
203 async fn get_all_workflow_types(
205 &self,
206 ) -> RustvelloResult<Vec<rustvello_proto::identifiers::TaskId>>;
207
208 async fn get_workflow_runs(
210 &self,
211 workflow_type: &rustvello_proto::identifiers::TaskId,
212 ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>>;
213
214 async fn set_workflow_data(
218 &self,
219 workflow_id: &InvocationId,
220 key: &str,
221 value: &str,
222 ) -> RustvelloResult<()>;
223
224 async fn get_workflow_data(
226 &self,
227 workflow_id: &InvocationId,
228 key: &str,
229 ) -> RustvelloResult<Option<String>>;
230
231 async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()>;
235
236 async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>>;
238
239 async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>>;
241
242 async fn store_workflow_sub_invocation(
246 &self,
247 workflow_id: &InvocationId,
248 sub_inv_id: &InvocationId,
249 ) -> RustvelloResult<()>;
250
251 async fn get_workflow_sub_invocations(
253 &self,
254 workflow_id: &InvocationId,
255 ) -> RustvelloResult<Vec<InvocationId>>;
256
257 async fn get_all_workflow_runs(
264 &self,
265 ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>> {
266 let types = self.get_all_workflow_types().await?;
267 let mut all = Vec::new();
268 for t in &types {
269 let runs = self.get_workflow_runs(t).await?;
270 all.extend(runs);
271 }
272 Ok(all)
273 }
274}
275
276#[async_trait]
282pub trait StateBackendRunner: Send + Sync {
283 async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()>;
287
288 async fn get_runner_context(
290 &self,
291 runner_id: &str,
292 ) -> RustvelloResult<Option<StoredRunnerContext>>;
293
294 async fn get_runner_contexts_by_parent(
296 &self,
297 parent_runner_id: &str,
298 ) -> RustvelloResult<Vec<StoredRunnerContext>>;
299
300 async fn get_invocation_ids_by_runner(
304 &self,
305 runner_id: &str,
306 limit: usize,
307 offset: usize,
308 ) -> RustvelloResult<Vec<InvocationId>>;
309
310 async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize>;
312
313 async fn get_history_in_timerange(
317 &self,
318 start: chrono::DateTime<chrono::Utc>,
319 end: chrono::DateTime<chrono::Utc>,
320 limit: usize,
321 offset: usize,
322 ) -> RustvelloResult<Vec<InvocationHistory>>;
323
324 async fn get_matching_runner_contexts(
330 &self,
331 partial_id: &str,
332 ) -> RustvelloResult<Vec<StoredRunnerContext>>;
333}