use async_trait::async_trait;
use rustvello_proto::call::CallDTO;
use rustvello_proto::identifiers::{CallId, InvocationId};
use rustvello_proto::invocation::{InvocationDTO, InvocationHistory};
use crate::context::RunnerContext;
use crate::error::{RustvelloResult, TaskError};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StoredRunnerContext {
pub runner_cls: String,
pub runner_id: String,
pub pid: u32,
pub hostname: String,
pub thread_id: u64,
pub started_at: chrono::DateTime<chrono::Utc>,
pub parent_runner_id: Option<String>,
pub parent_runner_cls: Option<String>,
}
impl StoredRunnerContext {
pub fn current(runner_id: impl Into<String>, runner_cls: impl Into<String>) -> Self {
let thread_id = format!("{:?}", std::thread::current().id());
let thread_num: u64 = thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(')')
.parse()
.unwrap_or(0);
Self {
runner_cls: runner_cls.into(),
runner_id: runner_id.into(),
pid: std::process::id(),
hostname: hostname::get().map_or_else(
|_| "unknown".to_string(),
|h| h.to_string_lossy().into_owned(),
),
thread_id: thread_num,
started_at: chrono::Utc::now(),
parent_runner_id: None,
parent_runner_cls: None,
}
}
pub fn new_child(
&self,
child_runner_id: impl Into<String>,
child_runner_cls: impl Into<String>,
) -> Self {
let mut child = Self::current(child_runner_id, child_runner_cls);
child.parent_runner_id = Some(self.runner_id.clone());
child.parent_runner_cls = Some(self.runner_cls.clone());
child
}
pub fn from_runtime(ctx: &RunnerContext) -> Self {
Self {
runner_cls: ctx.runner_cls.as_ref().to_string(),
runner_id: ctx.runner_id.to_string(),
pid: ctx.pid,
hostname: ctx.hostname.clone(),
thread_id: ctx.thread_id,
started_at: chrono::Utc::now(),
parent_runner_id: ctx.parent_ctx.as_ref().map(|p| p.runner_id.to_string()),
parent_runner_cls: ctx
.parent_ctx
.as_ref()
.map(|p| p.runner_cls.as_ref().to_string()),
}
}
}
pub trait StateBackend: StateBackendCore + StateBackendQuery + StateBackendRunner {}
impl<T: StateBackendCore + StateBackendQuery + StateBackendRunner> StateBackend for T {}
#[async_trait]
pub trait StateBackendCore: Send + Sync {
async fn upsert_invocation(
&self,
invocation: &InvocationDTO,
call: &CallDTO,
) -> RustvelloResult<()>;
async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO>;
async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO>;
async fn store_result(&self, invocation_id: &InvocationId, result: &str)
-> RustvelloResult<()>;
async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>>;
async fn store_error(
&self,
invocation_id: &InvocationId,
error: &TaskError,
) -> RustvelloResult<()>;
async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>>;
async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()>;
async fn get_history(
&self,
invocation_id: &InvocationId,
) -> RustvelloResult<Vec<InvocationHistory>>;
async fn purge(&self) -> RustvelloResult<()>;
fn backend_name(&self) -> &'static str {
"Unknown"
}
async fn usage_stats(&self) -> Vec<(&'static str, String)> {
Vec::new()
}
}
#[async_trait]
pub trait StateBackendQuery: Send + Sync {
async fn get_workflow_invocations(
&self,
workflow_id: &InvocationId,
) -> RustvelloResult<Vec<InvocationId>>;
async fn get_child_invocations(
&self,
parent_invocation_id: &InvocationId,
) -> RustvelloResult<Vec<InvocationId>>;
async fn store_workflow_run(
&self,
workflow: &rustvello_proto::invocation::WorkflowIdentity,
) -> RustvelloResult<()>;
async fn get_all_workflow_types(
&self,
) -> RustvelloResult<Vec<rustvello_proto::identifiers::TaskId>>;
async fn get_workflow_runs(
&self,
workflow_type: &rustvello_proto::identifiers::TaskId,
) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>>;
async fn set_workflow_data(
&self,
workflow_id: &InvocationId,
key: &str,
value: &str,
) -> RustvelloResult<()>;
async fn get_workflow_data(
&self,
workflow_id: &InvocationId,
key: &str,
) -> RustvelloResult<Option<String>>;
async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()>;
async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>>;
async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>>;
async fn store_workflow_sub_invocation(
&self,
workflow_id: &InvocationId,
sub_inv_id: &InvocationId,
) -> RustvelloResult<()>;
async fn get_workflow_sub_invocations(
&self,
workflow_id: &InvocationId,
) -> RustvelloResult<Vec<InvocationId>>;
async fn get_all_workflow_runs(
&self,
) -> RustvelloResult<Vec<rustvello_proto::invocation::WorkflowIdentity>> {
let types = self.get_all_workflow_types().await?;
let mut all = Vec::new();
for t in &types {
let runs = self.get_workflow_runs(t).await?;
all.extend(runs);
}
Ok(all)
}
}
#[async_trait]
pub trait StateBackendRunner: Send + Sync {
async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()>;
async fn get_runner_context(
&self,
runner_id: &str,
) -> RustvelloResult<Option<StoredRunnerContext>>;
async fn get_runner_contexts_by_parent(
&self,
parent_runner_id: &str,
) -> RustvelloResult<Vec<StoredRunnerContext>>;
async fn get_invocation_ids_by_runner(
&self,
runner_id: &str,
limit: usize,
offset: usize,
) -> RustvelloResult<Vec<InvocationId>>;
async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize>;
async fn get_history_in_timerange(
&self,
start: chrono::DateTime<chrono::Utc>,
end: chrono::DateTime<chrono::Utc>,
limit: usize,
offset: usize,
) -> RustvelloResult<Vec<InvocationHistory>>;
async fn get_matching_runner_contexts(
&self,
partial_id: &str,
) -> RustvelloResult<Vec<StoredRunnerContext>>;
}