use chrono::Utc;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use crate::db::queries::result_store::{self as queries, ResultStoreRow};
use crate::db::DbPool;
use crate::error::{AppError, AppResult};
use crate::snowflake::SnowflakeGenerator;
#[derive(Debug, Deserialize)]
pub struct PutResultBody {
pub name: String,
pub data: serde_json::Value,
#[serde(default = "default_scope")]
pub scope: String,
pub source_step: Option<String>,
#[serde(default)]
pub store: Option<String>,
#[serde(default)]
pub ttl: Option<String>,
#[serde(default)]
pub correlation: Option<serde_json::Value>,
#[serde(default)]
pub compress: bool,
}
fn default_scope() -> String {
"execution".to_string()
}
#[derive(Debug, Serialize)]
pub struct ResultPutResponse {
pub r#ref: String,
pub store: String,
pub scope: String,
pub bytes: u64,
pub sha256: Option<String>,
pub expires_at: Option<String>,
}
#[derive(Debug, Clone)]
pub struct NoetlRef {
pub execution_id: i64,
pub name: String,
pub result_id: i64,
}
pub fn parse_noetl_ref(s: &str) -> Result<NoetlRef, String> {
let path = s
.strip_prefix("noetl://")
.ok_or_else(|| format!("URI must start with 'noetl://', got: {s:?}"))?;
let parts: Vec<&str> = path.split('/').collect();
if parts.len() < 5 {
return Err(format!(
"URI must have 5 path segments after 'noetl://', got {}: {s:?}",
parts.len()
));
}
if parts[0] != "execution" {
return Err(format!(
"First path segment must be 'execution', got {:?} in {s:?}",
parts[0]
));
}
if parts[2] != "result" {
return Err(format!(
"Third path segment must be 'result', got {:?} in {s:?}",
parts[2]
));
}
let execution_id = parts[1]
.parse::<i64>()
.map_err(|_| format!("execution_id segment {:?} is not an i64 in {s:?}", parts[1]))?;
if parts.len() < 5 {
return Err(format!("URI missing result_id segment: {s:?}"));
}
let result_id_str = parts[parts.len() - 1];
let result_id = result_id_str
.parse::<i64>()
.map_err(|_| format!("result_id segment {:?} is not an i64 in {s:?}", result_id_str))?;
let name = parts[3..parts.len() - 1].join("/");
if name.is_empty() {
return Err(format!("name segment is empty in URI {s:?}"));
}
Ok(NoetlRef {
execution_id,
name,
result_id,
})
}
#[derive(Clone)]
pub struct ResultStoreService {
pool: DbPool,
snowflake: Arc<SnowflakeGenerator>,
}
impl ResultStoreService {
pub fn new(pool: DbPool, snowflake: Arc<SnowflakeGenerator>) -> Self {
Self { pool, snowflake }
}
pub async fn put(
&self,
execution_id: i64,
body: &PutResultBody,
) -> AppResult<ResultPutResponse> {
let serialised = serde_json::to_vec(&body.data)
.map_err(|e| AppError::Internal(format!("result_store.put: serialise: {e}")))?;
let bytes = serialised.len() as i64;
let sha256_hex = hex::encode(Sha256::digest(&serialised));
let result_id = self
.snowflake
.generate()
.map_err(|e| AppError::Internal(format!("result_store.put: snowflake: {e}")))?;
let noetl_ref = format!(
"noetl://execution/{}/result/{}/{}",
execution_id, body.name, result_id
);
let row = ResultStoreRow {
result_id,
execution_id,
name: body.name.clone(),
scope: body.scope.clone(),
source_step: body.source_step.clone(),
data: body.data.clone(),
bytes,
sha256: sha256_hex.clone(),
media_type: "application/json".to_string(),
created_at: Utc::now(),
expires_at: None,
};
queries::insert(&self.pool, &row).await?;
Ok(ResultPutResponse {
r#ref: noetl_ref,
store: "db".to_string(),
scope: body.scope.clone(),
bytes: bytes as u64,
sha256: Some(sha256_hex),
expires_at: None,
})
}
pub async fn resolve(&self, noetl_ref: &NoetlRef) -> AppResult<Option<serde_json::Value>> {
let row = queries::get_by_ref(
&self.pool,
noetl_ref.execution_id,
&noetl_ref.name,
noetl_ref.result_id,
)
.await?;
Ok(row.map(|r| r.data))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_standard_worker_emit() {
let r = parse_noetl_ref("noetl://execution/7654321/result/my_step/1234567890")
.expect("valid URI must parse");
assert_eq!(r.execution_id, 7654321);
assert_eq!(r.name, "my_step");
assert_eq!(r.result_id, 1234567890);
}
#[test]
fn parses_step_name_with_slash() {
let r = parse_noetl_ref("noetl://execution/1/result/a/b/999")
.expect("slash in name must parse");
assert_eq!(r.execution_id, 1);
assert_eq!(r.name, "a/b");
assert_eq!(r.result_id, 999);
}
#[test]
fn rejects_wrong_scheme() {
assert!(parse_noetl_ref("http://execution/1/result/step/2").is_err());
}
#[test]
fn rejects_too_few_segments() {
assert!(parse_noetl_ref("noetl://execution/1/result").is_err());
}
#[test]
fn rejects_non_numeric_execution_id() {
assert!(parse_noetl_ref("noetl://execution/abc/result/step/1").is_err());
}
#[test]
fn rejects_non_numeric_result_id() {
assert!(parse_noetl_ref("noetl://execution/1/result/step/xyz").is_err());
}
#[test]
fn rejects_wrong_first_segment() {
assert!(parse_noetl_ref("noetl://workflow/1/result/step/2").is_err());
}
#[test]
fn rejects_wrong_third_segment() {
assert!(parse_noetl_ref("noetl://execution/1/artifact/step/2").is_err());
}
#[test]
fn uri_format_round_trips_through_parser() {
let eid: i64 = 9876543210;
let name = "output_select";
let result_id: i64 = 1122334455;
let uri = format!("noetl://execution/{eid}/result/{name}/{result_id}");
let parsed = parse_noetl_ref(&uri).unwrap();
assert_eq!(parsed.execution_id, eid);
assert_eq!(parsed.name, name);
assert_eq!(parsed.result_id, result_id);
}
#[test]
fn serialise_and_hash_are_deterministic() {
let data = serde_json::json!({"rows": [1, 2, 3], "columns": ["a"]});
let bytes = serde_json::to_vec(&data).unwrap();
let hash = hex::encode(Sha256::digest(&bytes));
let bytes2 = serde_json::to_vec(&data).unwrap();
let hash2 = hex::encode(Sha256::digest(&bytes2));
assert_eq!(hash, hash2);
assert!(!hash.is_empty());
assert_eq!(hash.len(), 64); }
}