Skip to main content

rustvello_core/
state_backend.rs

1use async_trait::async_trait;
2
3use rustvello_proto::call::CallDTO;
4use rustvello_proto::identifiers::{CallId, InvocationId};
5use rustvello_proto::invocation::{InvocationDTO, InvocationHistory};
6
7use crate::context::RunnerContext;
8
9use crate::error::{RustvelloResult, TaskError};
10
11/// Runner execution context — metadata about the environment running an invocation.
12///
13/// Mirrors pynenc's `RunnerContext`: captures runner type, PID, hostname,
14/// thread ID, and optional parent context for hierarchical runner relationships
15/// (e.g., PersistentProcessRunner → PPRWorker).
16///
17/// Named `StoredRunnerContext` to distinguish from `context::RunnerContext` (the
18/// runtime task-local context used during invocation execution).
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub struct StoredRunnerContext {
21    /// Class/type name of the runner (e.g. "TaskRunner", "PPRWorker").
22    pub runner_cls: String,
23    /// Unique identifier for this runner instance.
24    pub runner_id: String,
25    pub pid: u32,
26    pub hostname: String,
27    pub thread_id: u64,
28    pub started_at: chrono::DateTime<chrono::Utc>,
29    /// Parent runner ID, if this is a child worker.
30    pub parent_runner_id: Option<String>,
31    /// Parent runner class name.
32    pub parent_runner_cls: Option<String>,
33}
34
35impl StoredRunnerContext {
36    pub fn current(runner_id: impl Into<String>, runner_cls: impl Into<String>) -> Self {
37        let thread_id = format!("{:?}", std::thread::current().id());
38        let thread_num: u64 = thread_id
39            .trim_start_matches("ThreadId(")
40            .trim_end_matches(')')
41            .parse()
42            .unwrap_or(0);
43        Self {
44            runner_cls: runner_cls.into(),
45            runner_id: runner_id.into(),
46            pid: std::process::id(),
47            hostname: hostname::get().map_or_else(
48                |_| "unknown".to_string(),
49                |h| h.to_string_lossy().into_owned(),
50            ),
51            thread_id: thread_num,
52            started_at: chrono::Utc::now(),
53            parent_runner_id: None,
54            parent_runner_cls: None,
55        }
56    }
57
58    /// Create a child runner context with this context as parent.
59    pub fn new_child(
60        &self,
61        child_runner_id: impl Into<String>,
62        child_runner_cls: impl Into<String>,
63    ) -> Self {
64        let mut child = Self::current(child_runner_id, child_runner_cls);
65        child.parent_runner_id = Some(self.runner_id.clone());
66        child.parent_runner_cls = Some(self.runner_cls.clone());
67        child
68    }
69
70    /// Convert a runtime `RunnerContext` into a `StoredRunnerContext`.
71    ///
72    /// Maps the runtime context (used in task-local/thread-local storage) to
73    /// the persistent format stored in the state backend for monitoring.
74    pub fn from_runtime(ctx: &RunnerContext) -> Self {
75        Self {
76            runner_cls: ctx.runner_cls.as_ref().to_string(),
77            runner_id: ctx.runner_id.to_string(),
78            pid: ctx.pid,
79            hostname: ctx.hostname.clone(),
80            thread_id: ctx.thread_id,
81            started_at: chrono::Utc::now(),
82            parent_runner_id: ctx.parent_ctx.as_ref().map(|p| p.runner_id.to_string()),
83            parent_runner_cls: ctx
84                .parent_ctx
85                .as_ref()
86                .map(|p| p.runner_cls.as_ref().to_string()),
87        }
88    }
89}
90
91/// State backend interface — persistence of invocations and results.
92///
93/// Mirrors pynenc's `BaseStateBackend`. This is a composite trait combining three sub-traits:
94/// - [`StateBackendCore`] — invocation/result/history storage, cleanup, introspection
95/// - [`StateBackendQuery`] — workflow queries, workflow discovery, workflow data
96/// - [`StateBackendRunner`] — runner context, runner analytics, time-range queries
97///
98/// Implementations should implement the sub-traits directly.
99/// This supertrait is auto-implemented via a blanket impl.
100pub trait StateBackend: StateBackendCore + StateBackendQuery + StateBackendRunner {}
101
102impl<T: StateBackendCore + StateBackendQuery + StateBackendRunner> StateBackend for T {}
103
104// ===========================================================================
105// Sub-trait: StateBackendCore
106// ===========================================================================
107
108/// Core state persistence — invocation storage, results, history, and cleanup.
109///
110/// All methods in this sub-trait are required (no defaults).
111#[async_trait]
112pub trait StateBackendCore: Send + Sync {
113    // --- Invocation storage ---
114
115    /// Store or update an invocation and its associated call.
116    async fn upsert_invocation(
117        &self,
118        invocation: &InvocationDTO,
119        call: &CallDTO,
120    ) -> RustvelloResult<()>;
121
122    /// Retrieve an invocation by ID.
123    async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO>;
124
125    /// Retrieve a call by ID.
126    async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO>;
127
128    // --- Result storage ---
129
130    /// Store the result of a successful invocation.
131    async fn store_result(&self, invocation_id: &InvocationId, result: &str)
132        -> RustvelloResult<()>;
133
134    /// Retrieve the result of a completed invocation.
135    async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>>;
136
137    /// Store error information for a failed invocation.
138    async fn store_error(
139        &self,
140        invocation_id: &InvocationId,
141        error: &TaskError,
142    ) -> RustvelloResult<()>;
143
144    /// Retrieve error information for a failed invocation.
145    async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>>;
146
147    // --- History ---
148
149    /// Record a status change in the audit log.
150    async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()>;
151
152    /// Get the full status history for an invocation.
153    async fn get_history(
154        &self,
155        invocation_id: &InvocationId,
156    ) -> RustvelloResult<Vec<InvocationHistory>>;
157
158    // --- Cleanup ---
159
160    /// Purge all stored data.
161    async fn purge(&self) -> RustvelloResult<()>;
162
163    /// Human-readable name of this backend implementation.
164    fn backend_name(&self) -> &'static str {
165        "Unknown"
166    }
167
168    /// Key-value statistics about this backend's current state.
169    async fn usage_stats(&self) -> Vec<(&'static str, String)> {
170        Vec::new()
171    }
172}
173
174// ===========================================================================
175// Sub-trait: StateBackendQuery
176// ===========================================================================
177
178/// Workflow queries, discovery, and data.
179#[async_trait]
180pub trait StateBackendQuery: Send + Sync {
181    // --- Workflow queries ---
182
183    /// Get all invocation IDs that belong to a workflow.
184    async fn get_workflow_invocations(
185        &self,
186        workflow_id: &InvocationId,
187    ) -> RustvelloResult<Vec<InvocationId>>;
188
189    /// Get direct child invocations of a parent invocation.
190    async fn get_child_invocations(
191        &self,
192        parent_invocation_id: &InvocationId,
193    ) -> RustvelloResult<Vec<InvocationId>>;
194
195    // --- Workflow discovery ---
196
197    /// Store a workflow run for tracking and monitoring.
198    async fn store_workflow_run(
199        &self,
200        workflow: &rustvello_proto::invocation::WorkflowIdentity,
201    ) -> RustvelloResult<()>;
202
203    /// Retrieve all distinct workflow types (task IDs that have started workflows).
204    async fn get_all_workflow_types(
205        &self,
206    ) -> RustvelloResult<Vec<rustvello_proto::identifiers::TaskId>>;
207
208    /// Retrieve workflow run identities for a specific workflow type.
209    async fn get_workflow_runs(
210        &self,
211        workflow_type: &rustvello_proto::identifiers::TaskId,
212    ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>>;
213
214    // --- Workflow data (key-value store scoped to a workflow) ---
215
216    /// Set a value in the workflow's key-value data store.
217    async fn set_workflow_data(
218        &self,
219        workflow_id: &InvocationId,
220        key: &str,
221        value: &str,
222    ) -> RustvelloResult<()>;
223
224    /// Get a value from the workflow's key-value data store.
225    async fn get_workflow_data(
226        &self,
227        workflow_id: &InvocationId,
228        key: &str,
229    ) -> RustvelloResult<Option<String>>;
230
231    // --- App info storage ---
232
233    /// Store application info as opaque JSON.
234    async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()>;
235
236    /// Get application info by ID.
237    async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>>;
238
239    /// Get all stored application infos as (app_id, info_json) pairs.
240    async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>>;
241
242    // --- Workflow sub-invocations ---
243
244    /// Record a sub-invocation belonging to a workflow.
245    async fn store_workflow_sub_invocation(
246        &self,
247        workflow_id: &InvocationId,
248        sub_inv_id: &InvocationId,
249    ) -> RustvelloResult<()>;
250
251    /// Get all sub-invocations for a workflow.
252    async fn get_workflow_sub_invocations(
253        &self,
254        workflow_id: &InvocationId,
255    ) -> RustvelloResult<Vec<InvocationId>>;
256
257    // --- Aggregated workflow queries ---
258
259    /// Get all workflow runs across all types.
260    ///
261    /// Default implementation calls `get_all_workflow_types()` then
262    /// `get_workflow_runs()` for each. Backends can override for efficiency.
263    async fn get_all_workflow_runs(
264        &self,
265    ) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>> {
266        let types = self.get_all_workflow_types().await?;
267        let mut all = Vec::new();
268        for t in &types {
269            let runs = self.get_workflow_runs(t).await?;
270            all.extend(runs);
271        }
272        Ok(all)
273    }
274}
275
276// ===========================================================================
277// Sub-trait: StateBackendRunner
278// ===========================================================================
279
280/// Runner context storage, analytics, and time-range queries.
281#[async_trait]
282pub trait StateBackendRunner: Send + Sync {
283    // --- Runner context ---
284
285    /// Store a runner's execution context for monitoring.
286    async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()>;
287
288    /// Retrieve a runner's execution context.
289    async fn get_runner_context(
290        &self,
291        runner_id: &str,
292    ) -> RustvelloResult<Option<StoredRunnerContext>>;
293
294    /// Get all runner contexts whose `parent_runner_id` matches the given runner.
295    async fn get_runner_contexts_by_parent(
296        &self,
297        parent_runner_id: &str,
298    ) -> RustvelloResult<Vec<StoredRunnerContext>>;
299
300    /// Get invocation IDs that were processed by a specific runner (by runner_id in history).
301    ///
302    /// Returns paginated results. `limit` of 0 means no limit.
303    async fn get_invocation_ids_by_runner(
304        &self,
305        runner_id: &str,
306        limit: usize,
307        offset: usize,
308    ) -> RustvelloResult<Vec<InvocationId>>;
309
310    /// Count invocations that were processed by a specific runner.
311    async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize>;
312
313    // --- Time-range queries ---
314
315    /// Get history entries within a time range, for monitoring log explorers.
316    async fn get_history_in_timerange(
317        &self,
318        start: chrono::DateTime<chrono::Utc>,
319        end: chrono::DateTime<chrono::Utc>,
320        limit: usize,
321        offset: usize,
322    ) -> RustvelloResult<Vec<InvocationHistory>>;
323
324    // --- Matching runner contexts ---
325
326    /// Get runner contexts whose runner_id contains the given partial string.
327    ///
328    /// Used for prefix/partial matching on runner IDs.
329    async fn get_matching_runner_contexts(
330        &self,
331        partial_id: &str,
332    ) -> RustvelloResult<Vec<StoredRunnerContext>>;
333}