1use serde::{Deserialize, Serialize};
38use serde_json;
39use std::collections::HashMap;
40use truthlinked_governance::params as gp;
41use truthlinked_governance::{CellVisibility, SchemaEntry, UrlProposal, UrlResponseFormat};
42use truthlinked_staking::StakingState;
43
44pub fn request_id(
52 url: &str,
53 method: &str,
54 body: &[u8],
55 format: UrlResponseFormat,
56 schema_id: Option<[u8; 32]>,
57) -> [u8; 32] {
58 let mut h = blake3::Hasher::new();
59 h.update(b"oracle:request:");
60 h.update(url.as_bytes());
61 h.update(b":");
62 h.update(method.as_bytes());
63 h.update(b":");
64 h.update(body);
65 h.update(b":");
66 h.update(match format {
67 UrlResponseFormat::Raw => b"raw",
68 UrlResponseFormat::JsonCanonical => b"json",
69 UrlResponseFormat::PriceUsd => b"price_usd",
70 });
71 if let Some(id) = schema_id {
72 h.update(b":schema:");
73 h.update(&id);
74 }
75 *h.finalize().as_bytes()
76}
77
78pub fn compute_commit_hash(
81 validator_pk: &[u8],
82 req_id: &[u8; 32],
83 response_body: &[u8],
84 response_status: u16,
85) -> [u8; 32] {
86 let mut h = blake3::Hasher::new();
87 h.update(b"oracle:commit:");
88 h.update(validator_pk);
89 h.update(req_id);
90 h.update(response_body);
91 h.update(&response_status.to_le_bytes());
92 *h.finalize().as_bytes()
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct OracleRequest {
98 pub request_id: [u8; 32],
100 pub url: String,
101 pub method: String, pub body: Vec<u8>,
103 pub response_format: UrlResponseFormat,
104 pub schema_id: Option<[u8; 32]>,
105 pub requested_at: u64,
107 pub expires_at: u64,
109 pub requesting_cell: [u8; 32],
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct OracleCommit {
117 pub request_id: [u8; 32],
118 pub commit_hash: [u8; 32],
120 pub validator_pk: Vec<u8>,
122 pub committed_at: u64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct OracleReveal {
129 pub request_id: [u8; 32],
130 pub response_body: Vec<u8>,
131 pub response_status: u16,
132 pub validator_pk: Vec<u8>,
133 pub revealed_at: u64,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct OracleCommitPayload {
139 pub request_id: [u8; 32],
140 pub commit_hash: [u8; 32],
141 #[serde(skip)]
143 pub response_body: Vec<u8>,
144 #[serde(skip)]
146 pub response_status: u16,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152pub struct OracleResult {
153 pub request_id: [u8; 32],
154 pub url: String,
155 pub method: String,
156 pub response_body: Vec<u8>,
157 pub response_status: u16,
158 pub body_hash: [u8; 32],
160 pub finalized_at: u64,
162 pub expires_at: u64,
164 pub quorum_stake_num: u64,
166 pub quorum_stake_den: u64,
167 pub requesting_cell: [u8; 32],
169}
170
171impl OracleResult {
172 pub fn is_expired(&self, current_height: u64) -> bool {
173 current_height >= self.expires_at
174 }
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize, Default)]
180pub struct OracleTally {
181 pub request_id: [u8; 32],
182 pub commits: HashMap<Vec<u8>, [u8; 32]>,
184 pub reveals: HashMap<Vec<u8>, (Vec<u8>, u16)>,
186 pub committed_stake: u64,
188 pub total_stake: u64,
189 pub commit_phase_closed: bool,
191}
192
193impl OracleTally {
194 pub fn try_finalize(&self, staking: &StakingState) -> Option<(Vec<u8>, u16, u64, u64)> {
197 self.try_finalize_with_format(staking, UrlResponseFormat::Raw)
198 }
199
200 pub fn try_finalize_with_format(
201 &self,
202 staking: &StakingState,
203 response_format: UrlResponseFormat,
204 ) -> Option<(Vec<u8>, u16, u64, u64)> {
205 if response_format == UrlResponseFormat::PriceUsd {
206 return self.try_finalize_price_usd(staking);
207 }
208
209 let mut tally: HashMap<[u8; 32], (Vec<u8>, u16, u64)> = HashMap::new();
211
212 for (val_pk, (body, status)) in &self.reveals {
213 if let Some(commit_hash) = self.commits.get(val_pk.as_slice()) {
215 let expected = compute_commit_hash(val_pk, &self.request_id, body, *status);
216 if expected != *commit_hash {
217 continue; }
219 } else {
220 continue; }
222
223 let val_stake = staking
224 .validators
225 .get(val_pk.as_slice())
226 .map(|v| v.active_stake)
227 .unwrap_or(0);
228
229 let body_hash: [u8; 32] = (*blake3::hash(body).as_bytes()).into();
230 let entry = tally.entry(body_hash).or_insert((body.clone(), *status, 0));
231 entry.2 += val_stake;
232 }
233
234 let current_height = staking.current_height;
235 let total_stake: u64 = staking
236 .validators
237 .values()
238 .filter(|v| v.is_active(current_height))
239 .map(|v| v.active_stake)
240 .sum();
241 if total_stake == 0 {
242 return None;
243 }
244
245 for (_hash, (body, status, agreeing_stake)) in tally {
247 let pct = (agreeing_stake * 100) / total_stake;
248 if pct >= gp::get_u64(gp::PARAM_ORACLE_REVEAL_QUORUM_PERCENT) {
249 return Some((body, status, agreeing_stake, total_stake));
250 }
251 }
252
253 None
254 }
255
256 fn try_finalize_price_usd(&self, staking: &StakingState) -> Option<(Vec<u8>, u16, u64, u64)> {
257 const PRICE_TOLERANCE_BPS: u64 = 10;
258
259 let current_height = staking.current_height;
260 let total_stake: u64 = staking
261 .validators
262 .values()
263 .filter(|v| v.is_active(current_height))
264 .map(|v| v.active_stake)
265 .sum();
266 if total_stake == 0 {
267 return None;
268 }
269
270 let mut samples: Vec<(u64, u64, u16)> = Vec::new();
271 for (val_pk, (body, status)) in &self.reveals {
272 let commit_hash = self.commits.get(val_pk.as_slice())?;
273 let expected = compute_commit_hash(val_pk, &self.request_id, body, *status);
274 if expected != *commit_hash {
275 continue;
276 }
277 let stake = staking
278 .validators
279 .get(val_pk.as_slice())
280 .map(|v| v.active_stake)
281 .unwrap_or(0);
282 if stake == 0 {
283 continue;
284 }
285 if let Some(price) = parse_price_usd_micros(body) {
286 samples.push((price, stake, *status));
287 }
288 }
289 samples.sort_by_key(|(price, _, _)| *price);
290
291 let mut best: Option<(usize, usize, u64)> = None;
292 for start in 0..samples.len() {
293 let anchor = samples[start].0.max(1);
294 let tolerance = ((anchor as u128) * PRICE_TOLERANCE_BPS as u128 / 10_000u128) as u64;
295 let mut stake_sum = 0u64;
296 let mut end = start;
297 while end < samples.len() && samples[end].0.saturating_sub(anchor) <= tolerance {
298 stake_sum = stake_sum.saturating_add(samples[end].1);
299 end += 1;
300 }
301 if best
302 .map(|(_, _, best_stake)| stake_sum > best_stake)
303 .unwrap_or(true)
304 {
305 best = Some((start, end, stake_sum));
306 }
307 }
308
309 let (start, end, agreeing_stake) = best?;
310 if (agreeing_stake * 100) / total_stake
311 < gp::get_u64(gp::PARAM_ORACLE_REVEAL_QUORUM_PERCENT)
312 {
313 return None;
314 }
315
316 let target = agreeing_stake.saturating_add(1) / 2;
317 let mut cumulative = 0u64;
318 let mut median = samples[start].0;
319 for (price, stake, _) in &samples[start..end] {
320 cumulative = cumulative.saturating_add(*stake);
321 if cumulative >= target {
322 median = *price;
323 break;
324 }
325 }
326
327 let status = samples[start..end]
328 .iter()
329 .find(|(price, _, _)| *price == median)
330 .map(|(_, _, status)| *status)
331 .unwrap_or(200);
332 let body = serde_json::json!({
333 "kind": "price_usd",
334 "price_usd_micros": median,
335 "tolerance_bps": PRICE_TOLERANCE_BPS,
336 "samples": end - start
337 });
338 serde_json::to_vec(&body)
339 .ok()
340 .map(|body| (body, status, agreeing_stake, total_stake))
341 }
342
343 pub fn commit_quorum_reached(&self) -> bool {
345 if self.total_stake == 0 {
346 return false;
347 }
348 (self.committed_stake * 100) / self.total_stake
349 >= gp::get_u64(gp::PARAM_ORACLE_COMMIT_QUORUM_PERCENT)
350 }
351}
352
353pub mod return_codes {
361 pub const OK: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_OK;
363 pub const MEM_ERR: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_MEM_ERR;
365 pub const ENCODING_ERR: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_ENCODING_ERR;
367 pub const URL_NOT_APPROVED: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_URL_NOT_APPROVED;
369 pub const ORACLE_PENDING: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_PENDING;
371 pub const ORACLE_EXPIRED: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_EXPIRED;
373 pub const RESPONSE_TOO_LARGE: i32 =
375 truthlinked_core::constants::HTTP_ORACLE_RC_RESPONSE_TOO_LARGE;
376 pub const DEPTH_LIMIT_EXCEEDED: i32 =
378 truthlinked_core::constants::HTTP_ORACLE_RC_DEPTH_LIMIT_EXCEEDED;
379 pub const INVALID_METHOD: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_INVALID_METHOD;
381}
382
383pub fn check_url_permitted(
387 url: &str,
388 visibility: CellVisibility,
389 url_proposals: &im::HashMap<String, UrlProposal>,
390) -> bool {
391 match visibility {
392 CellVisibility::Private => true,
393 CellVisibility::Public => url_proposals
394 .values()
395 .any(|p| p.approved && url_matches_pattern(url, &p.url_pattern)),
396 }
397}
398
399pub fn url_matches_pattern(url: &str, pattern: &str) -> bool {
401 if pattern.ends_with("/*") {
402 url.starts_with(&pattern[..pattern.len() - 2])
403 } else {
404 url == pattern
405 }
406}
407
408pub fn queue_oracle_request(
412 url: String,
413 method: String,
414 body: Vec<u8>,
415 response_format: UrlResponseFormat,
416 schema_id: Option<[u8; 32]>,
417 requesting_cell: [u8; 32],
418 current_height: u64,
419) -> OracleRequest {
420 let req_id = request_id(&url, &method, &body, response_format, schema_id);
421 OracleRequest {
422 request_id: req_id,
423 url,
424 method,
425 body,
426 response_format,
427 schema_id,
428 requested_at: current_height,
429 expires_at: current_height + gp::get_u64(gp::PARAM_ORACLE_REQUEST_TIMEOUT_BLOCKS),
430 requesting_cell,
431 }
432}
433
434pub async fn validator_fetch_and_commit(
443 pending_requests: &[OracleRequest],
444 validator_pk: &[u8],
445 current_height: u64,
446 schema_registry: &im::HashMap<[u8; 32], SchemaEntry>,
447) -> Vec<OracleCommitPayload> {
448 let client = match reqwest::Client::builder()
449 .timeout(std::time::Duration::from_millis(gp::get_u64(
450 gp::PARAM_HTTP_TIMEOUT_MS,
451 )))
452 .build()
453 {
454 Ok(c) => c,
455 Err(_) => return vec![],
456 };
457
458 let mut commits = Vec::new();
459
460 for req in pending_requests {
461 if req.expires_at < current_height {
462 tracing::warn!(
463 " Oracle: request {} expired ({} < {})",
464 hex::encode(&req.request_id[..4]),
465 req.expires_at,
466 current_height
467 );
468 continue;
469 }
470
471 if req.body.len() > gp::get_usize(gp::PARAM_MAX_HTTP_BODY_BYTES) {
472 tracing::warn!(
473 " Oracle: request {} body too large",
474 hex::encode(&req.request_id[..4])
475 );
476 continue;
477 }
478
479 let result = execute_http_fetch(&client, req).await;
480
481 let (response_body, response_status) = match result {
482 Ok((body, status)) if body.len() <= gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) => {
483 (body, status)
484 }
485 Ok((body, _)) => {
486 tracing::warn!(" Oracle: response too large ({} bytes)", body.len());
487 continue;
488 }
489 Err(e) => {
490 tracing::warn!(" Oracle: fetch error for {}: {}", req.url, e);
491 continue;
492 }
493 };
494
495 let canonical_body = match canonicalize_response(req.response_format, &response_body) {
496 Ok(v) => v,
497 Err(e) => {
498 tracing::warn!(" Oracle: canonicalize failed: {}", e);
499 continue;
500 }
501 };
502 let canonical_body = if let Some(schema_id) = req.schema_id {
504 match project_by_schema(schema_id, &canonical_body, schema_registry) {
505 Ok(projected) => projected,
506 Err(_) => continue, }
508 } else {
509 canonical_body
510 };
511 if canonical_body.len() > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
512 continue;
513 }
514 let commit_hash = compute_commit_hash(
515 validator_pk,
516 &req.request_id,
517 &canonical_body,
518 response_status,
519 );
520
521 commits.push(OracleCommitPayload {
522 request_id: req.request_id,
523 commit_hash,
524 response_body: canonical_body, response_status,
526 });
527 }
528
529 commits
530}
531
532fn canonicalize_response(format: UrlResponseFormat, body: &[u8]) -> Result<Vec<u8>, String> {
533 match format {
534 UrlResponseFormat::Raw => {
535 if body.first() == Some(&b'{') || body.first() == Some(&b'[') {
539 canonicalize_json(body).or_else(|_| Ok(body.to_vec()))
540 } else {
541 Ok(body.to_vec())
542 }
543 }
544 UrlResponseFormat::JsonCanonical => canonicalize_json(body),
545 UrlResponseFormat::PriceUsd => canonicalize_price_usd(body),
546 }
547}
548
549fn canonicalize_price_usd(body: &[u8]) -> Result<Vec<u8>, String> {
550 let price = parse_price_usd_micros(body).ok_or("price_usd not found")?;
551 let value: serde_json::Value = serde_json::from_slice(body).unwrap_or(serde_json::Value::Null);
552 let pair = value
553 .get("symbol")
554 .or_else(|| value.get("pair"))
555 .or_else(|| value.get("market"))
556 .and_then(|v| v.as_str())
557 .unwrap_or("BTC-USD");
558 serde_json::to_vec(&serde_json::json!({
559 "kind": "price_usd_sample",
560 "pair": pair,
561 "price_usd_micros": price
562 }))
563 .map_err(|e| format!("json encode error: {}", e))
564}
565
566fn parse_price_usd_micros(body: &[u8]) -> Option<u64> {
567 let value: serde_json::Value = serde_json::from_slice(body).ok()?;
568 let candidates = [
569 "price_usd_micros",
570 "price_usd_cents",
571 "price_usd",
572 "price",
573 "last",
574 "rate",
575 "amount",
576 "data.price",
577 "data.amount",
578 "result.price",
579 ];
580 for path in candidates {
581 if let Some(v) = json_path(&value, path) {
582 let multiplier = match path {
583 "price_usd_micros" => 1.0,
584 "price_usd_cents" => 10_000.0,
585 _ => 1_000_000.0,
586 };
587 if let Some(n) = json_number(v) {
588 if n.is_finite() && n > 0.0 {
589 return Some((n * multiplier).round() as u64);
590 }
591 }
592 }
593 }
594 None
595}
596
597fn json_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
598 let mut cur = value;
599 for part in path.split('.') {
600 cur = cur.get(part)?;
601 }
602 Some(cur)
603}
604
605fn json_number(value: &serde_json::Value) -> Option<f64> {
606 match value {
607 serde_json::Value::Number(n) => n.as_f64(),
608 serde_json::Value::String(s) => s.replace(',', "").parse::<f64>().ok(),
609 _ => None,
610 }
611}
612
613fn canonicalize_json(body: &[u8]) -> Result<Vec<u8>, String> {
614 let value: serde_json::Value =
615 serde_json::from_slice(body).map_err(|e| format!("invalid json: {}", e))?;
616 let normalized = normalize_json(value);
617 serde_json::to_vec(&normalized).map_err(|e| format!("json encode error: {}", e))
618}
619
620const NON_DETERMINISTIC_FIELDS: &[&str] = &[
624 "timestamp",
625 "ts",
626 "time",
627 "date",
628 "datetime",
629 "created_at",
630 "updated_at",
631 "request_id",
632 "requestId",
633 "req_id",
634 "reqId",
635 "trace_id",
636 "traceId",
637 "nonce",
638 "random",
639 "seed",
640 "session_id",
641 "sessionId",
642];
643
644fn normalize_json(value: serde_json::Value) -> serde_json::Value {
645 match value {
646 serde_json::Value::Object(map) => {
647 let mut keys: Vec<String> = map
648 .keys()
649 .filter(|k| !NON_DETERMINISTIC_FIELDS.contains(&k.as_str()))
650 .cloned()
651 .collect();
652 keys.sort();
653 let mut new_map = serde_json::Map::new();
654 for key in keys {
655 if let Some(v) = map.get(&key) {
656 new_map.insert(key, normalize_json(v.clone()));
657 }
658 }
659 serde_json::Value::Object(new_map)
660 }
661 serde_json::Value::Array(items) => {
662 serde_json::Value::Array(items.into_iter().map(normalize_json).collect())
663 }
664 other => other,
665 }
666}
667
668fn project_by_schema(
672 schema_id: [u8; 32],
673 canonical_body: &[u8],
674 schema_registry: &im::HashMap<[u8; 32], SchemaEntry>,
675) -> Result<Vec<u8>, String> {
676 let entry = match schema_registry.get(&schema_id) {
677 Some(e) if e.approved => e,
678 _ => return Err("schema not approved".into()),
679 };
680 let value: serde_json::Value =
681 serde_json::from_slice(canonical_body).map_err(|e| format!("invalid json: {}", e))?;
682 let obj = match value.as_object() {
683 Some(o) => o,
684 None => return Err("schema expects object at root".into()),
685 };
686 let mut projected = serde_json::Map::new();
688 let mut sorted_keys = entry.keys.clone();
689 sorted_keys.sort();
690 for key in &sorted_keys {
691 let val = resolve_path(obj, key);
693 match val {
694 Some(v) => {
695 projected.insert(key.clone(), v.clone());
696 }
697 None => return Err(format!("schema key {} not found in response", key)),
698 }
699 }
700 serde_json::to_vec(&serde_json::Value::Object(projected))
701 .map_err(|e| format!("json encode error: {}", e))
702}
703
704fn resolve_path<'a>(
708 obj: &'a serde_json::Map<String, serde_json::Value>,
709 path: &str,
710) -> Option<&'a serde_json::Value> {
711 let mut parts = path.splitn(2, '.');
712 let key = parts.next()?;
713 let val = obj.get(key)?;
714 match parts.next() {
715 None => Some(val),
716 Some(rest) => val.as_object().and_then(|o| resolve_path(o, rest)),
717 }
718}
719
720pub fn url_response_format(
722 url: &str,
723 visibility: CellVisibility,
724 url_proposals: &im::HashMap<String, UrlProposal>,
725) -> UrlResponseFormat {
726 match visibility {
727 CellVisibility::Private => UrlResponseFormat::Raw,
728 CellVisibility::Public => url_proposals
729 .values()
730 .find(|p| p.approved && url_matches_pattern(url, &p.url_pattern))
731 .map(|p| p.response_format)
732 .unwrap_or(UrlResponseFormat::Raw),
733 }
734}
735
736pub fn url_schema_id(
737 url: &str,
738 visibility: CellVisibility,
739 url_proposals: &im::HashMap<String, UrlProposal>,
740) -> Option<[u8; 32]> {
741 match visibility {
742 CellVisibility::Private => None,
743 CellVisibility::Public => url_proposals
744 .values()
745 .find(|p| p.approved && url_matches_pattern(url, &p.url_pattern))
746 .and_then(|p| p.schema_id),
747 }
748}
749
750fn strip_accord_query_params(url: &str) -> String {
751 let Some((base, query)) = url.split_once('?') else {
752 return url.to_string();
753 };
754 let kept: Vec<&str> = query
755 .split('&')
756 .filter(|part| !part.starts_with("accord_format="))
757 .filter(|part| !part.is_empty())
758 .collect();
759 if kept.is_empty() {
760 base.to_string()
761 } else {
762 format!("{}?{}", base, kept.join("&"))
763 }
764}
765
766async fn execute_http_fetch(
768 client: &reqwest::Client,
769 req: &OracleRequest,
770) -> Result<(Vec<u8>, u16), String> {
771 let fetch_url = strip_accord_query_params(&req.url);
772 let builder = match req.method.as_str() {
773 "GET" => client.get(&fetch_url),
774 "POST" => client.post(&fetch_url).body(req.body.clone()),
775 "PUT" => client.put(&fetch_url).body(req.body.clone()),
776 "DELETE" => client.delete(&fetch_url),
777 _ => return Err(format!("Unsupported method: {}", req.method)),
778 };
779
780 let response = builder
781 .send()
782 .await
783 .map_err(|e| format!("HTTP error: {}", e))?;
784
785 let status = response.status().as_u16();
786
787 if let Some(len) = response.content_length() {
788 if len as usize > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
789 return Err("Response too large".to_string());
790 }
791 }
792
793 let body = response
794 .bytes()
795 .await
796 .map_err(|e| format!("Body read error: {}", e))?
797 .to_vec();
798
799 if body.len() > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
800 return Err("Response too large".to_string());
801 }
802
803 Ok((body, status))
804}
805
806pub mod storage_keys {
811 pub fn oracle_request(req_id: &[u8; 32]) -> [u8; 32] {
813 let mut h = blake3::Hasher::new();
814 h.update(b"oracle:req:");
815 h.update(req_id);
816 (*h.finalize().as_bytes()).into()
817 }
818
819 pub fn oracle_tally(req_id: &[u8; 32]) -> [u8; 32] {
821 let mut h = blake3::Hasher::new();
822 h.update(b"oracle:tally:");
823 h.update(req_id);
824 (*h.finalize().as_bytes()).into()
825 }
826
827 pub fn oracle_result(req_id: &[u8; 32]) -> [u8; 32] {
829 let mut h = blake3::Hasher::new();
830 h.update(b"oracle:result:");
831 h.update(req_id);
832 (*h.finalize().as_bytes()).into()
833 }
834
835 pub fn url_proposal(pattern: &str) -> [u8; 32] {
837 let mut h = blake3::Hasher::new();
838 h.update(b"url:proposal:");
839 h.update(pattern.as_bytes());
840 (*h.finalize().as_bytes()).into()
841 }
842
843 pub fn cell_visibility(cell_id: &[u8; 32]) -> [u8; 32] {
845 let mut h = blake3::Hasher::new();
846 h.update(b"cell:vis:");
847 h.update(cell_id);
848 (*h.finalize().as_bytes()).into()
849 }
850}