hyperstack-interpreter 0.6.9

AST transformation runtime and VM for HyperStack streaming pipelines
Documentation
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;

use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::ast::{HttpMethod, ResolverType};
use crate::compiler::MultiEntityBytecode;
use crate::resolvers::{TokenMetadataResolverClient, UrlResolverClient};
use crate::vm::{ResolverRequest, VmContext};
use crate::Mutation;

pub type ResolverBatchResult =
    Result<HashMap<String, Value>, Box<dyn std::error::Error + Send + Sync>>;
pub type ResolverBatchFuture<'a> = Pin<Box<dyn Future<Output = ResolverBatchResult> + Send + 'a>>;
pub type ResolverApplyFuture<'a> = Pin<Box<dyn Future<Output = Vec<Mutation>> + Send + 'a>>;
pub type SharedRuntimeResolver = std::sync::Arc<dyn RuntimeResolver>;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum RuntimeResolverRequest {
    TokenMetadata {
        key: String,
        mint: String,
    },
    UrlJson {
        key: String,
        url: String,
        method: HttpMethod,
    },
}

impl RuntimeResolverRequest {
    pub fn key(&self) -> &str {
        match self {
            Self::TokenMetadata { key, .. } | Self::UrlJson { key, .. } => key,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeResolverBatchRequest {
    pub requests: Vec<RuntimeResolverRequest>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeResolverResponse {
    pub key: String,
    pub value: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeResolverBatchResponse {
    #[serde(default)]
    pub resolved: Vec<RuntimeResolverResponse>,
}

#[derive(Debug)]
struct PendingRuntimeResolverRequest {
    request: ResolverRequest,
    backend_request: RuntimeResolverRequest,
}

pub trait RuntimeResolver: Send + Sync {
    fn resolve_batch<'a>(
        &'a self,
        requests: &'a [RuntimeResolverRequest],
    ) -> ResolverBatchFuture<'a>;

    fn resolve_and_apply<'a>(
        &'a self,
        vm: &'a std::sync::Mutex<VmContext>,
        bytecode: &'a MultiEntityBytecode,
        requests: Vec<ResolverRequest>,
    ) -> ResolverApplyFuture<'a> {
        Box::pin(async move {
            if requests.is_empty() {
                return Vec::new();
            }

            let mut cached = Vec::new();
            let mut pending = Vec::new();
            let mut invalid = Vec::new();

            {
                let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());

                for request in requests {
                    let canonical_key =
                        runtime_resolver_cache_key(&request.resolver, &request.input);

                    if let Some(resolved_value) = vm_guard.get_cached_resolver_value(&canonical_key)
                    {
                        cached.push((request, resolved_value));
                        continue;
                    }

                    match runtime_request_from_vm_request(&request) {
                        Some(backend_request) => pending.push(PendingRuntimeResolverRequest {
                            request,
                            backend_request,
                        }),
                        None => invalid.push(request),
                    }
                }

                if !invalid.is_empty() {
                    vm_guard.restore_resolver_requests(invalid);
                }
            }

            let resolved_map = if pending.is_empty() {
                Ok(HashMap::new())
            } else {
                let mut unique = HashMap::new();
                for entry in &pending {
                    unique
                        .entry(entry.backend_request.key().to_string())
                        .or_insert_with(|| entry.backend_request.clone());
                }

                let unique_requests: Vec<RuntimeResolverRequest> = unique.into_values().collect();
                self.resolve_batch(&unique_requests).await
            };

            let mut mutations = Vec::new();
            let mut failed = Vec::new();
            let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());

            for (request, resolved_value) in cached {
                match vm_guard.apply_resolver_result(bytecode, &request.cache_key, resolved_value) {
                    Ok(mut new_mutations) => mutations.append(&mut new_mutations),
                    Err(err) => {
                        tracing::warn!(
                            cache_key = %request.cache_key,
                            error = %err,
                            "Failed to apply cached resolver result"
                        );
                        failed.push(request);
                    }
                }
            }

            match resolved_map {
                Ok(resolved_map) => {
                    for entry in pending {
                        match resolved_map.get(entry.backend_request.key()) {
                            Some(resolved_value) => match vm_guard.apply_resolver_result(
                                bytecode,
                                &entry.request.cache_key,
                                resolved_value.clone(),
                            ) {
                                Ok(mut new_mutations) => mutations.append(&mut new_mutations),
                                Err(err) => {
                                    tracing::warn!(
                                        cache_key = %entry.request.cache_key,
                                        error = %err,
                                        "Failed to apply resolver result"
                                    );
                                    failed.push(entry.request);
                                }
                            },
                            None => failed.push(entry.request),
                        }
                    }
                }
                Err(err) => {
                    tracing::warn!(error = %err, "Runtime resolver backend request failed");
                    failed.extend(pending.into_iter().map(|entry| entry.request));
                }
            }

            if !failed.is_empty() {
                vm_guard.restore_resolver_requests(failed);
            }

            mutations
        })
    }
}

pub struct InProcessResolver {
    token_client: Option<TokenMetadataResolverClient>,
    url_client: UrlResolverClient,
}

impl InProcessResolver {
    pub fn from_env() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
        Ok(Self {
            token_client: TokenMetadataResolverClient::from_env()?,
            url_client: UrlResolverClient::new(),
        })
    }

    pub fn new(
        token_client: Option<TokenMetadataResolverClient>,
        url_client: UrlResolverClient,
    ) -> Self {
        Self {
            token_client,
            url_client,
        }
    }

    pub async fn resolve_batch_internal(
        &self,
        requests: &[RuntimeResolverRequest],
    ) -> ResolverBatchResult {
        let mut results = HashMap::new();
        let mut token_requests = Vec::new();
        let mut url_requests = Vec::new();

        for request in requests {
            match request {
                RuntimeResolverRequest::TokenMetadata { key, mint } => {
                    token_requests.push((key.clone(), mint.clone()));
                }
                RuntimeResolverRequest::UrlJson { key, url, method } => {
                    url_requests.push((key.clone(), url.clone(), method.clone()));
                }
            }
        }

        if !token_requests.is_empty() {
            if let Some(token_client) = &self.token_client {
                let mints: Vec<String> = token_requests
                    .iter()
                    .map(|(_, mint)| mint.clone())
                    .collect();
                match token_client.resolve_token_metadata(&mints).await {
                    Ok(resolved) => {
                        for (key, mint) in token_requests {
                            if let Some(value) = resolved.get(&mint) {
                                results.insert(key, value.clone());
                            }
                        }
                    }
                    Err(err) => {
                        tracing::warn!(error = %err, "Failed to resolve token metadata batch");
                    }
                }
            } else {
                tracing::warn!(
                    count = token_requests.len(),
                    "DAS_API_ENDPOINT not set; token resolver requests will be re-queued"
                );
            }
        }

        if !url_requests.is_empty() {
            let mut unique = HashMap::new();
            for (key, url, method) in &url_requests {
                unique
                    .entry((url.clone(), method.clone()))
                    .or_insert_with(Vec::new)
                    .push(key.clone());
            }

            let batch_input: Vec<(String, HttpMethod)> = unique.keys().cloned().collect();
            let resolved = self.url_client.resolve_batch(&batch_input).await;

            for ((url, method), keys) in unique {
                if let Some(value) = resolved.get(&(url, method)) {
                    for key in keys {
                        results.insert(key, value.clone());
                    }
                }
            }
        }

        Ok(results)
    }
}

impl RuntimeResolver for InProcessResolver {
    fn resolve_batch<'a>(
        &'a self,
        requests: &'a [RuntimeResolverRequest],
    ) -> ResolverBatchFuture<'a> {
        Box::pin(async move { self.resolve_batch_internal(requests).await })
    }
}

pub fn runtime_resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
    crate::vm::resolver_cache_key(resolver, input)
}

fn runtime_request_from_vm_request(request: &ResolverRequest) -> Option<RuntimeResolverRequest> {
    match &request.resolver {
        ResolverType::Token => extract_mint_from_input(&request.input).map(|mint| {
            RuntimeResolverRequest::TokenMetadata {
                key: request.cache_key.clone(),
                mint,
            }
        }),
        ResolverType::Url(config) => match &request.input {
            Value::String(url) if !url.is_empty() => Some(RuntimeResolverRequest::UrlJson {
                key: request.cache_key.clone(),
                url: url.clone(),
                method: config.method.clone(),
            }),
            _ => None,
        },
    }
}

fn extract_mint_from_input(input: &Value) -> Option<String> {
    match input {
        Value::String(value) if !value.is_empty() => Some(value.clone()),
        Value::Object(map) => map
            .get("mint")
            .and_then(|value| value.as_str())
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        _ => None,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn token_request_extracts_mint_from_object_input() {
        let request = ResolverRequest {
            cache_key: "token:mint".to_string(),
            resolver: ResolverType::Token,
            input: serde_json::json!({ "mint": "abc" }),
        };

        let runtime_request = runtime_request_from_vm_request(&request).unwrap();
        assert_eq!(
            runtime_request,
            RuntimeResolverRequest::TokenMetadata {
                key: "token:mint".to_string(),
                mint: "abc".to_string(),
            }
        );
    }

    #[test]
    fn url_request_uses_existing_cache_key() {
        let request = ResolverRequest {
            cache_key: "url:get:https://example.com".to_string(),
            resolver: ResolverType::Url(crate::ast::UrlResolverConfig {
                url_source: crate::ast::UrlSource::FieldPath("metadata_url".to_string()),
                method: HttpMethod::Get,
                extract_path: None,
            }),
            input: serde_json::json!("https://example.com"),
        };

        let runtime_request = runtime_request_from_vm_request(&request).unwrap();
        assert_eq!(
            runtime_request,
            RuntimeResolverRequest::UrlJson {
                key: "url:get:https://example.com".to_string(),
                url: "https://example.com".to_string(),
                method: HttpMethod::Get,
            }
        );
    }
}