1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::ast::{HttpMethod, ResolverType};
9use crate::compiler::MultiEntityBytecode;
10use crate::resolvers::{TokenMetadataResolverClient, UrlResolverClient};
11use crate::vm::{ResolverRequest, VmContext};
12use crate::Mutation;
13
14pub type ResolverBatchResult =
15 Result<HashMap<String, Value>, Box<dyn std::error::Error + Send + Sync>>;
16pub type ResolverBatchFuture<'a> = Pin<Box<dyn Future<Output = ResolverBatchResult> + Send + 'a>>;
17pub type ResolverApplyFuture<'a> = Pin<Box<dyn Future<Output = Vec<Mutation>> + Send + 'a>>;
18pub type SharedRuntimeResolver = std::sync::Arc<dyn RuntimeResolver>;
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
21#[serde(tag = "type", rename_all = "snake_case")]
22pub enum RuntimeResolverRequest {
23 TokenMetadata {
24 key: String,
25 mint: String,
26 },
27 UrlJson {
28 key: String,
29 url: String,
30 method: HttpMethod,
31 },
32}
33
34impl RuntimeResolverRequest {
35 pub fn key(&self) -> &str {
36 match self {
37 Self::TokenMetadata { key, .. } | Self::UrlJson { key, .. } => key,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct RuntimeResolverBatchRequest {
44 pub requests: Vec<RuntimeResolverRequest>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct RuntimeResolverResponse {
49 pub key: String,
50 pub value: Value,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct RuntimeResolverBatchResponse {
55 #[serde(default)]
56 pub resolved: Vec<RuntimeResolverResponse>,
57}
58
59#[derive(Debug)]
60struct PendingRuntimeResolverRequest {
61 request: ResolverRequest,
62 backend_request: RuntimeResolverRequest,
63}
64
65pub trait RuntimeResolver: Send + Sync {
66 fn resolve_batch<'a>(
67 &'a self,
68 requests: &'a [RuntimeResolverRequest],
69 ) -> ResolverBatchFuture<'a>;
70
71 fn resolve_and_apply<'a>(
72 &'a self,
73 vm: &'a std::sync::Mutex<VmContext>,
74 bytecode: &'a MultiEntityBytecode,
75 requests: Vec<ResolverRequest>,
76 ) -> ResolverApplyFuture<'a> {
77 Box::pin(async move {
78 if requests.is_empty() {
79 return Vec::new();
80 }
81
82 let mut cached = Vec::new();
83 let mut pending = Vec::new();
84 let mut invalid = Vec::new();
85
86 {
87 let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());
88
89 for request in requests {
90 let canonical_key =
91 runtime_resolver_cache_key(&request.resolver, &request.input);
92
93 if let Some(resolved_value) = vm_guard.get_cached_resolver_value(&canonical_key)
94 {
95 cached.push((request, resolved_value));
96 continue;
97 }
98
99 match runtime_request_from_vm_request(&request) {
100 Some(backend_request) => pending.push(PendingRuntimeResolverRequest {
101 request,
102 backend_request,
103 }),
104 None => invalid.push(request),
105 }
106 }
107
108 if !invalid.is_empty() {
109 vm_guard.restore_resolver_requests(invalid);
110 }
111 }
112
113 let resolved_map = if pending.is_empty() {
114 Ok(HashMap::new())
115 } else {
116 let mut unique = HashMap::new();
117 for entry in &pending {
118 unique
119 .entry(entry.backend_request.key().to_string())
120 .or_insert_with(|| entry.backend_request.clone());
121 }
122
123 let unique_requests: Vec<RuntimeResolverRequest> = unique.into_values().collect();
124 self.resolve_batch(&unique_requests).await
125 };
126
127 let mut mutations = Vec::new();
128 let mut failed = Vec::new();
129 let mut vm_guard = vm.lock().unwrap_or_else(|e| e.into_inner());
130
131 for (request, resolved_value) in cached {
132 match vm_guard.apply_resolver_result(bytecode, &request.cache_key, resolved_value) {
133 Ok(mut new_mutations) => mutations.append(&mut new_mutations),
134 Err(err) => {
135 tracing::warn!(
136 cache_key = %request.cache_key,
137 error = %err,
138 "Failed to apply cached resolver result"
139 );
140 failed.push(request);
141 }
142 }
143 }
144
145 match resolved_map {
146 Ok(resolved_map) => {
147 for entry in pending {
148 match resolved_map.get(entry.backend_request.key()) {
149 Some(resolved_value) => match vm_guard.apply_resolver_result(
150 bytecode,
151 &entry.request.cache_key,
152 resolved_value.clone(),
153 ) {
154 Ok(mut new_mutations) => mutations.append(&mut new_mutations),
155 Err(err) => {
156 tracing::warn!(
157 cache_key = %entry.request.cache_key,
158 error = %err,
159 "Failed to apply resolver result"
160 );
161 failed.push(entry.request);
162 }
163 },
164 None => failed.push(entry.request),
165 }
166 }
167 }
168 Err(err) => {
169 tracing::warn!(error = %err, "Runtime resolver backend request failed");
170 failed.extend(pending.into_iter().map(|entry| entry.request));
171 }
172 }
173
174 if !failed.is_empty() {
175 vm_guard.restore_resolver_requests(failed);
176 }
177
178 mutations
179 })
180 }
181}
182
183pub struct InProcessResolver {
184 token_client: Option<TokenMetadataResolverClient>,
185 url_client: UrlResolverClient,
186}
187
188impl InProcessResolver {
189 pub fn from_env() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
190 Ok(Self {
191 token_client: TokenMetadataResolverClient::from_env()?,
192 url_client: UrlResolverClient::new(),
193 })
194 }
195
196 pub fn new(
197 token_client: Option<TokenMetadataResolverClient>,
198 url_client: UrlResolverClient,
199 ) -> Self {
200 Self {
201 token_client,
202 url_client,
203 }
204 }
205
206 pub async fn resolve_batch_internal(
207 &self,
208 requests: &[RuntimeResolverRequest],
209 ) -> ResolverBatchResult {
210 let mut results = HashMap::new();
211 let mut token_requests = Vec::new();
212 let mut url_requests = Vec::new();
213
214 for request in requests {
215 match request {
216 RuntimeResolverRequest::TokenMetadata { key, mint } => {
217 token_requests.push((key.clone(), mint.clone()));
218 }
219 RuntimeResolverRequest::UrlJson { key, url, method } => {
220 url_requests.push((key.clone(), url.clone(), method.clone()));
221 }
222 }
223 }
224
225 if !token_requests.is_empty() {
226 if let Some(token_client) = &self.token_client {
227 let mints: Vec<String> = token_requests
228 .iter()
229 .map(|(_, mint)| mint.clone())
230 .collect();
231 match token_client.resolve_token_metadata(&mints).await {
232 Ok(resolved) => {
233 for (key, mint) in token_requests {
234 if let Some(value) = resolved.get(&mint) {
235 results.insert(key, value.clone());
236 }
237 }
238 }
239 Err(err) => {
240 tracing::warn!(error = %err, "Failed to resolve token metadata batch");
241 }
242 }
243 } else {
244 tracing::warn!(
245 count = token_requests.len(),
246 "DAS_API_ENDPOINT not set; token resolver requests will be re-queued"
247 );
248 }
249 }
250
251 if !url_requests.is_empty() {
252 let mut unique = HashMap::new();
253 for (key, url, method) in &url_requests {
254 unique
255 .entry((url.clone(), method.clone()))
256 .or_insert_with(Vec::new)
257 .push(key.clone());
258 }
259
260 let batch_input: Vec<(String, HttpMethod)> = unique.keys().cloned().collect();
261 let resolved = self.url_client.resolve_batch(&batch_input).await;
262
263 for ((url, method), keys) in unique {
264 if let Some(value) = resolved.get(&(url, method)) {
265 for key in keys {
266 results.insert(key, value.clone());
267 }
268 }
269 }
270 }
271
272 Ok(results)
273 }
274}
275
276impl RuntimeResolver for InProcessResolver {
277 fn resolve_batch<'a>(
278 &'a self,
279 requests: &'a [RuntimeResolverRequest],
280 ) -> ResolverBatchFuture<'a> {
281 Box::pin(async move { self.resolve_batch_internal(requests).await })
282 }
283}
284
285pub fn runtime_resolver_cache_key(resolver: &ResolverType, input: &Value) -> String {
286 crate::vm::resolver_cache_key(resolver, input)
287}
288
289fn runtime_request_from_vm_request(request: &ResolverRequest) -> Option<RuntimeResolverRequest> {
290 match &request.resolver {
291 ResolverType::Token => extract_mint_from_input(&request.input).map(|mint| {
292 RuntimeResolverRequest::TokenMetadata {
293 key: request.cache_key.clone(),
294 mint,
295 }
296 }),
297 ResolverType::Url(config) => match &request.input {
298 Value::String(url) if !url.is_empty() => Some(RuntimeResolverRequest::UrlJson {
299 key: request.cache_key.clone(),
300 url: url.clone(),
301 method: config.method.clone(),
302 }),
303 _ => None,
304 },
305 }
306}
307
308fn extract_mint_from_input(input: &Value) -> Option<String> {
309 match input {
310 Value::String(value) if !value.is_empty() => Some(value.clone()),
311 Value::Object(map) => map
312 .get("mint")
313 .and_then(|value| value.as_str())
314 .filter(|value| !value.is_empty())
315 .map(str::to_string),
316 _ => None,
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn token_request_extracts_mint_from_object_input() {
326 let request = ResolverRequest {
327 cache_key: "token:mint".to_string(),
328 resolver: ResolverType::Token,
329 input: serde_json::json!({ "mint": "abc" }),
330 };
331
332 let runtime_request = runtime_request_from_vm_request(&request).unwrap();
333 assert_eq!(
334 runtime_request,
335 RuntimeResolverRequest::TokenMetadata {
336 key: "token:mint".to_string(),
337 mint: "abc".to_string(),
338 }
339 );
340 }
341
342 #[test]
343 fn url_request_uses_existing_cache_key() {
344 let request = ResolverRequest {
345 cache_key: "url:get:https://example.com".to_string(),
346 resolver: ResolverType::Url(crate::ast::UrlResolverConfig {
347 url_source: crate::ast::UrlSource::FieldPath("metadata_url".to_string()),
348 method: HttpMethod::Get,
349 extract_path: None,
350 }),
351 input: serde_json::json!("https://example.com"),
352 };
353
354 let runtime_request = runtime_request_from_vm_request(&request).unwrap();
355 assert_eq!(
356 runtime_request,
357 RuntimeResolverRequest::UrlJson {
358 key: "url:get:https://example.com".to_string(),
359 url: "https://example.com".to_string(),
360 method: HttpMethod::Get,
361 }
362 );
363 }
364}