Skip to main content

hyperstack_interpreter/
runtime_resolvers.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::ast::{HttpMethod, ResolverType};
9use crate::compiler::MultiEntityBytecode;
10use crate::resolvers::{TokenMetadataResolverClient, UrlResolverClient};
11use crate::vm::{ResolverRequest, VmContext};
12use crate::Mutation;
13
14pub type ResolverBatchResult =
15    Result<HashMap<String, Value>, Box<dyn std::error::Error + Send + Sync>>;
16pub type ResolverBatchFuture<'a> = Pin<Box<dyn Future<Output = ResolverBatchResult> + Send + 'a>>;
17pub type ResolverApplyFuture<'a> = Pin<Box<dyn Future<Output = Vec<Mutation>> + Send + 'a>>;
18pub type SharedRuntimeResolver = std::sync::Arc<dyn RuntimeResolver>;
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
21#[serde(tag = "type", rename_all = "snake_case")]
22pub enum RuntimeResolverRequest {
23    TokenMetadata {
24        key: String,
25        mint: String,
26    },
27    UrlJson {
28        key: String,
29        url: String,
30        method: HttpMethod,
31    },
32}
33
34impl RuntimeResolverRequest {
35    pub fn key(&self) -> &str {
36        match self {
37            Self::TokenMetadata { key, .. } | Self::UrlJson { key, .. } => key,
38        }
39    }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct RuntimeResolverBatchRequest {
44    pub requests: Vec<RuntimeResolverRequest>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct RuntimeResolverResponse {
49    pub key: String,
50    pub value: Value,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct RuntimeResolverBatchResponse {
55    #[serde(default)]
56    pub resolved: Vec<RuntimeResolverResponse>,
57}
58
59#[derive(Debug)]
60struct PendingRuntimeResolverRequest {
61    request: ResolverRequest,
62    backend_request: RuntimeResolverRequest,
63}
64
65pub trait RuntimeResolver: Send + Sync {
66    fn resolve_batch<'a>(
67        &'a self,
68        requests: &'a [RuntimeResolverRequest],
69    ) -> ResolverBatchFuture<'a>;
70
71    fn resolve_and_apply<'a>(
72        &'a self,
73        vm: &'a std::sync::Mutex<VmContext>,
74        bytecode: &'a MultiEntityBytecode,
75        requests: Vec<ResolverRequest>,
76    ) -> ResolverApplyFuture<'a> {
77        Box::pin(async move {
78            if requests.is_empty() {
79                return Vec::new();
80            }
81
82            let mut cached = Vec::new();
83            let mut pending = Vec::new();
84            let mut invalid = Vec::new();
85
86            {
87                let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());
88
89                for request in requests {
90                    let canonical_key =
91                        runtime_resolver_cache_key(&request.resolver, &request.input);
92
93                    if let Some(resolved_value) = vm_guard.get_cached_resolver_value(&canonical_key)
94                    {
95                        cached.push((request, resolved_value));
96                        continue;
97                    }
98
99                    match runtime_request_from_vm_request(&request) {
100                        Some(backend_request) => pending.push(PendingRuntimeResolverRequest {
101                            request,
102                            backend_request,
103                        }),
104                        None => invalid.push(request),
105                    }
106                }
107
108                if !invalid.is_empty() {
109                    vm_guard.restore_resolver_requests(invalid);
110                }
111            }
112
113            let resolved_map = if pending.is_empty() {
114                Ok(HashMap::new())
115            } else {
116                let mut unique = HashMap::new();
117                for entry in &pending {
118                    unique
119                        .entry(entry.backend_request.key().to_string())
120                        .or_insert_with(|| entry.backend_request.clone());
121                }
122
123                let unique_requests: Vec<RuntimeResolverRequest> = unique.into_values().collect();
124                self.resolve_batch(&unique_requests).await
125            };
126
127            let mut mutations = Vec::new();
128            let mut failed = Vec::new();
129            let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());
130
131            for (request, resolved_value) in cached {
132                match vm_guard.apply_resolver_result(bytecode, &request.cache_key, resolved_value) {
133                    Ok(mut new_mutations) => mutations.append(&mut new_mutations),
134                    Err(err) => {
135                        tracing::warn!(
136                            cache_key = %request.cache_key,
137                            error = %err,
138                            "Failed to apply cached resolver result"
139                        );
140                        failed.push(request);
141                    }
142                }
143            }
144
145            match resolved_map {
146                Ok(resolved_map) => {
147                    for entry in pending {
148                        match resolved_map.get(entry.backend_request.key()) {
149                            Some(resolved_value) => match vm_guard.apply_resolver_result(
150                                bytecode,
151                                &entry.request.cache_key,
152                                resolved_value.clone(),
153                            ) {
154                                Ok(mut new_mutations) => mutations.append(&mut new_mutations),
155                                Err(err) => {
156                                    tracing::warn!(
157                                        cache_key = %entry.request.cache_key,
158                                        error = %err,
159                                        "Failed to apply resolver result"
160                                    );
161                                    failed.push(entry.request);
162                                }
163                            },
164                            None => failed.push(entry.request),
165                        }
166                    }
167                }
168                Err(err) => {
169                    tracing::warn!(error = %err, "Runtime resolver backend request failed");
170                    failed.extend(pending.into_iter().map(|entry| entry.request));
171                }
172            }
173
174            if !failed.is_empty() {
175                vm_guard.restore_resolver_requests(failed);
176            }
177
178            mutations
179        })
180    }
181}
182
183pub struct InProcessResolver {
184    token_client: Option<TokenMetadataResolverClient>,
185    url_client: UrlResolverClient,
186}
187
188impl InProcessResolver {
189    pub fn from_env() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
190        Ok(Self {
191            token_client: TokenMetadataResolverClient::from_env()?,
192            url_client: UrlResolverClient::new(),
193        })
194    }
195
196    pub fn new(
197        token_client: Option<TokenMetadataResolverClient>,
198        url_client: UrlResolverClient,
199    ) -> Self {
200        Self {
201            token_client,
202            url_client,
203        }
204    }
205
206    pub async fn resolve_batch_internal(
207        &self,
208        requests: &[RuntimeResolverRequest],
209    ) -> ResolverBatchResult {
210        let mut results = HashMap::new();
211        let mut token_requests = Vec::new();
212        let mut url_requests = Vec::new();
213
214        for request in requests {
215            match request {
216                RuntimeResolverRequest::TokenMetadata { key, mint } => {
217                    token_requests.push((key.clone(), mint.clone()));
218                }
219                RuntimeResolverRequest::UrlJson { key, url, method } => {
220                    url_requests.push((key.clone(), url.clone(), method.clone()));
221                }
222            }
223        }
224
225        if !token_requests.is_empty() {
226            if let Some(token_client) = &self.token_client {
227                let mints: Vec<String> = token_requests
228                    .iter()
229                    .map(|(_, mint)| mint.clone())
230                    .collect();
231                match token_client.resolve_token_metadata(&mints).await {
232                    Ok(resolved) => {
233                        for (key, mint) in token_requests {
234                            if let Some(value) = resolved.get(&mint) {
235                                results.insert(key, value.clone());
236                            }
237                        }
238                    }
239                    Err(err) => {
240                        tracing::warn!(error = %err, "Failed to resolve token metadata batch");
241                    }
242                }
243            } else {
244                tracing::warn!(
245                    count = token_requests.len(),
246                    "DAS_API_ENDPOINT not set; token resolver requests will be re-queued"
247                );
248            }
249        }
250
251        if !url_requests.is_empty() {
252            let mut unique = HashMap::new();
253            for (key, url, method) in &url_requests {
254                unique
255                    .entry((url.clone(), method.clone()))
256                    .or_insert_with(Vec::new)
257                    .push(key.clone());
258            }
259
260            let batch_input: Vec<(String, HttpMethod)> = unique.keys().cloned().collect();
261            let resolved = self.url_client.resolve_batch(&batch_input).await;
262
263            for ((url, method), keys) in unique {
264                if let Some(value) = resolved.get(&(url, method)) {
265                    for key in keys {
266                        results.insert(key, value.clone());
267                    }
268                }
269            }
270        }
271
272        Ok(results)
273    }
274}
275
276impl RuntimeResolver for InProcessResolver {
277    fn resolve_batch<'a>(
278        &'a self,
279        requests: &'a [RuntimeResolverRequest],
280    ) -> ResolverBatchFuture<'a> {
281        Box::pin(async move { self.resolve_batch_internal(requests).await })
282    }
283}
284
285pub fn runtime_resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
286    crate::vm::resolver_cache_key(resolver, input)
287}
288
289fn runtime_request_from_vm_request(request: &ResolverRequest) -> Option<RuntimeResolverRequest> {
290    match &request.resolver {
291        ResolverType::Token => extract_mint_from_input(&request.input).map(|mint| {
292            RuntimeResolverRequest::TokenMetadata {
293                key: request.cache_key.clone(),
294                mint,
295            }
296        }),
297        ResolverType::Url(config) => match &request.input {
298            Value::String(url) if !url.is_empty() => Some(RuntimeResolverRequest::UrlJson {
299                key: request.cache_key.clone(),
300                url: url.clone(),
301                method: config.method.clone(),
302            }),
303            _ => None,
304        },
305    }
306}
307
308fn extract_mint_from_input(input: &Value) -> Option<String> {
309    match input {
310        Value::String(value) if !value.is_empty() => Some(value.clone()),
311        Value::Object(map) => map
312            .get("mint")
313            .and_then(|value| value.as_str())
314            .filter(|value| !value.is_empty())
315            .map(str::to_string),
316        _ => None,
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn token_request_extracts_mint_from_object_input() {
326        let request = ResolverRequest {
327            cache_key: "token:mint".to_string(),
328            resolver: ResolverType::Token,
329            input: serde_json::json!({ "mint": "abc" }),
330        };
331
332        let runtime_request = runtime_request_from_vm_request(&request).unwrap();
333        assert_eq!(
334            runtime_request,
335            RuntimeResolverRequest::TokenMetadata {
336                key: "token:mint".to_string(),
337                mint: "abc".to_string(),
338            }
339        );
340    }
341
342    #[test]
343    fn url_request_uses_existing_cache_key() {
344        let request = ResolverRequest {
345            cache_key: "url:get:https://example.com".to_string(),
346            resolver: ResolverType::Url(crate::ast::UrlResolverConfig {
347                url_source: crate::ast::UrlSource::FieldPath("metadata_url".to_string()),
348                method: HttpMethod::Get,
349                extract_path: None,
350            }),
351            input: serde_json::json!("https://example.com"),
352        };
353
354        let runtime_request = runtime_request_from_vm_request(&request).unwrap();
355        assert_eq!(
356            runtime_request,
357            RuntimeResolverRequest::UrlJson {
358                key: "url:get:https://example.com".to_string(),
359                url: "https://example.com".to_string(),
360                method: HttpMethod::Get,
361            }
362        );
363    }
364}