use serde::{Deserialize, Serialize};
use serde_json;
use std::collections::HashMap;
use truthlinked_governance::params as gp;
use truthlinked_governance::{CellVisibility, SchemaEntry, UrlProposal, UrlResponseFormat};
use truthlinked_staking::StakingState;
pub fn request_id(
url: &str,
method: &str,
body: &[u8],
format: UrlResponseFormat,
schema_id: Option<[u8; 32]>,
) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"oracle:request:");
h.update(url.as_bytes());
h.update(b":");
h.update(method.as_bytes());
h.update(b":");
h.update(body);
h.update(b":");
h.update(match format {
UrlResponseFormat::Raw => b"raw",
UrlResponseFormat::JsonCanonical => b"json",
UrlResponseFormat::PriceUsd => b"price_usd",
});
if let Some(id) = schema_id {
h.update(b":schema:");
h.update(&id);
}
*h.finalize().as_bytes()
}
pub fn compute_commit_hash(
validator_pk: &[u8],
req_id: &[u8; 32],
response_body: &[u8],
response_status: u16,
) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"oracle:commit:");
h.update(validator_pk);
h.update(req_id);
h.update(response_body);
h.update(&response_status.to_le_bytes());
*h.finalize().as_bytes()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleRequest {
pub request_id: [u8; 32],
pub url: String,
pub method: String, pub body: Vec<u8>,
pub response_format: UrlResponseFormat,
pub schema_id: Option<[u8; 32]>,
pub requested_at: u64,
pub expires_at: u64,
pub requesting_cell: [u8; 32],
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleCommit {
pub request_id: [u8; 32],
pub commit_hash: [u8; 32],
pub validator_pk: Vec<u8>,
pub committed_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleReveal {
pub request_id: [u8; 32],
pub response_body: Vec<u8>,
pub response_status: u16,
pub validator_pk: Vec<u8>,
pub revealed_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleCommitPayload {
pub request_id: [u8; 32],
pub commit_hash: [u8; 32],
#[serde(skip)]
pub response_body: Vec<u8>,
#[serde(skip)]
pub response_status: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OracleResult {
pub request_id: [u8; 32],
pub url: String,
pub method: String,
pub response_body: Vec<u8>,
pub response_status: u16,
pub body_hash: [u8; 32],
pub finalized_at: u64,
pub expires_at: u64,
pub quorum_stake_num: u64,
pub quorum_stake_den: u64,
pub requesting_cell: [u8; 32],
}
impl OracleResult {
pub fn is_expired(&self, current_height: u64) -> bool {
current_height >= self.expires_at
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct OracleTally {
pub request_id: [u8; 32],
pub commits: HashMap<Vec<u8>, [u8; 32]>,
pub reveals: HashMap<Vec<u8>, (Vec<u8>, u16)>,
pub committed_stake: u64,
pub total_stake: u64,
pub commit_phase_closed: bool,
}
impl OracleTally {
pub fn try_finalize(&self, staking: &StakingState) -> Option<(Vec<u8>, u16, u64, u64)> {
self.try_finalize_with_format(staking, UrlResponseFormat::Raw)
}
pub fn try_finalize_with_format(
&self,
staking: &StakingState,
response_format: UrlResponseFormat,
) -> Option<(Vec<u8>, u16, u64, u64)> {
if response_format == UrlResponseFormat::PriceUsd {
return self.try_finalize_price_usd(staking);
}
let mut tally: HashMap<[u8; 32], (Vec<u8>, u16, u64)> = HashMap::new();
for (val_pk, (body, status)) in &self.reveals {
if let Some(commit_hash) = self.commits.get(val_pk.as_slice()) {
let expected = compute_commit_hash(val_pk, &self.request_id, body, *status);
if expected != *commit_hash {
continue; }
} else {
continue; }
let val_stake = staking
.validators
.get(val_pk.as_slice())
.map(|v| v.active_stake)
.unwrap_or(0);
let body_hash: [u8; 32] = (*blake3::hash(body).as_bytes()).into();
let entry = tally.entry(body_hash).or_insert((body.clone(), *status, 0));
entry.2 += val_stake;
}
let current_height = staking.current_height;
let total_stake: u64 = staking
.validators
.values()
.filter(|v| v.is_active(current_height))
.map(|v| v.active_stake)
.sum();
if total_stake == 0 {
return None;
}
for (_hash, (body, status, agreeing_stake)) in tally {
let pct = (agreeing_stake * 100) / total_stake;
if pct >= gp::get_u64(gp::PARAM_ORACLE_REVEAL_QUORUM_PERCENT) {
return Some((body, status, agreeing_stake, total_stake));
}
}
None
}
fn try_finalize_price_usd(&self, staking: &StakingState) -> Option<(Vec<u8>, u16, u64, u64)> {
const PRICE_TOLERANCE_BPS: u64 = 10;
let current_height = staking.current_height;
let total_stake: u64 = staking
.validators
.values()
.filter(|v| v.is_active(current_height))
.map(|v| v.active_stake)
.sum();
if total_stake == 0 {
return None;
}
let mut samples: Vec<(u64, u64, u16)> = Vec::new();
for (val_pk, (body, status)) in &self.reveals {
let commit_hash = self.commits.get(val_pk.as_slice())?;
let expected = compute_commit_hash(val_pk, &self.request_id, body, *status);
if expected != *commit_hash {
continue;
}
let stake = staking
.validators
.get(val_pk.as_slice())
.map(|v| v.active_stake)
.unwrap_or(0);
if stake == 0 {
continue;
}
if let Some(price) = parse_price_usd_micros(body) {
samples.push((price, stake, *status));
}
}
samples.sort_by_key(|(price, _, _)| *price);
let mut best: Option<(usize, usize, u64)> = None;
for start in 0..samples.len() {
let anchor = samples[start].0.max(1);
let tolerance = ((anchor as u128) * PRICE_TOLERANCE_BPS as u128 / 10_000u128) as u64;
let mut stake_sum = 0u64;
let mut end = start;
while end < samples.len() && samples[end].0.saturating_sub(anchor) <= tolerance {
stake_sum = stake_sum.saturating_add(samples[end].1);
end += 1;
}
if best
.map(|(_, _, best_stake)| stake_sum > best_stake)
.unwrap_or(true)
{
best = Some((start, end, stake_sum));
}
}
let (start, end, agreeing_stake) = best?;
if (agreeing_stake * 100) / total_stake
< gp::get_u64(gp::PARAM_ORACLE_REVEAL_QUORUM_PERCENT)
{
return None;
}
let target = agreeing_stake.saturating_add(1) / 2;
let mut cumulative = 0u64;
let mut median = samples[start].0;
for (price, stake, _) in &samples[start..end] {
cumulative = cumulative.saturating_add(*stake);
if cumulative >= target {
median = *price;
break;
}
}
let status = samples[start..end]
.iter()
.find(|(price, _, _)| *price == median)
.map(|(_, _, status)| *status)
.unwrap_or(200);
let body = serde_json::json!({
"kind": "price_usd",
"price_usd_micros": median,
"tolerance_bps": PRICE_TOLERANCE_BPS,
"samples": end - start
});
serde_json::to_vec(&body)
.ok()
.map(|body| (body, status, agreeing_stake, total_stake))
}
pub fn commit_quorum_reached(&self) -> bool {
if self.total_stake == 0 {
return false;
}
(self.committed_stake * 100) / self.total_stake
>= gp::get_u64(gp::PARAM_ORACLE_COMMIT_QUORUM_PERCENT)
}
}
pub mod return_codes {
pub const OK: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_OK;
pub const MEM_ERR: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_MEM_ERR;
pub const ENCODING_ERR: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_ENCODING_ERR;
pub const URL_NOT_APPROVED: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_URL_NOT_APPROVED;
pub const ORACLE_PENDING: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_PENDING;
pub const ORACLE_EXPIRED: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_EXPIRED;
pub const RESPONSE_TOO_LARGE: i32 =
truthlinked_core::constants::HTTP_ORACLE_RC_RESPONSE_TOO_LARGE;
pub const DEPTH_LIMIT_EXCEEDED: i32 =
truthlinked_core::constants::HTTP_ORACLE_RC_DEPTH_LIMIT_EXCEEDED;
pub const INVALID_METHOD: i32 = truthlinked_core::constants::HTTP_ORACLE_RC_INVALID_METHOD;
}
pub fn check_url_permitted(
url: &str,
visibility: CellVisibility,
url_proposals: &im::HashMap<String, UrlProposal>,
) -> bool {
match visibility {
CellVisibility::Private => true,
CellVisibility::Public => url_proposals
.values()
.any(|p| p.approved && url_matches_pattern(url, &p.url_pattern)),
}
}
pub fn url_matches_pattern(url: &str, pattern: &str) -> bool {
if pattern.ends_with("/*") {
url.starts_with(&pattern[..pattern.len() - 2])
} else {
url == pattern
}
}
pub fn queue_oracle_request(
url: String,
method: String,
body: Vec<u8>,
response_format: UrlResponseFormat,
schema_id: Option<[u8; 32]>,
requesting_cell: [u8; 32],
current_height: u64,
) -> OracleRequest {
let req_id = request_id(&url, &method, &body, response_format, schema_id);
OracleRequest {
request_id: req_id,
url,
method,
body,
response_format,
schema_id,
requested_at: current_height,
expires_at: current_height + gp::get_u64(gp::PARAM_ORACLE_REQUEST_TIMEOUT_BLOCKS),
requesting_cell,
}
}
pub async fn validator_fetch_and_commit(
pending_requests: &[OracleRequest],
validator_pk: &[u8],
current_height: u64,
schema_registry: &im::HashMap<[u8; 32], SchemaEntry>,
) -> Vec<OracleCommitPayload> {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(gp::get_u64(
gp::PARAM_HTTP_TIMEOUT_MS,
)))
.build()
{
Ok(c) => c,
Err(_) => return vec![],
};
let mut commits = Vec::new();
for req in pending_requests {
if req.expires_at < current_height {
tracing::warn!(
" Oracle: request {} expired ({} < {})",
hex::encode(&req.request_id[..4]),
req.expires_at,
current_height
);
continue;
}
if req.body.len() > gp::get_usize(gp::PARAM_MAX_HTTP_BODY_BYTES) {
tracing::warn!(
" Oracle: request {} body too large",
hex::encode(&req.request_id[..4])
);
continue;
}
let result = execute_http_fetch(&client, req).await;
let (response_body, response_status) = match result {
Ok((body, status)) if body.len() <= gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) => {
(body, status)
}
Ok((body, _)) => {
tracing::warn!(" Oracle: response too large ({} bytes)", body.len());
continue;
}
Err(e) => {
tracing::warn!(" Oracle: fetch error for {}: {}", req.url, e);
continue;
}
};
let canonical_body = match canonicalize_response(req.response_format, &response_body) {
Ok(v) => v,
Err(e) => {
tracing::warn!(" Oracle: canonicalize failed: {}", e);
continue;
}
};
let canonical_body = if let Some(schema_id) = req.schema_id {
match project_by_schema(schema_id, &canonical_body, schema_registry) {
Ok(projected) => projected,
Err(_) => continue, }
} else {
canonical_body
};
if canonical_body.len() > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
continue;
}
let commit_hash = compute_commit_hash(
validator_pk,
&req.request_id,
&canonical_body,
response_status,
);
commits.push(OracleCommitPayload {
request_id: req.request_id,
commit_hash,
response_body: canonical_body, response_status,
});
}
commits
}
fn canonicalize_response(format: UrlResponseFormat, body: &[u8]) -> Result<Vec<u8>, String> {
match format {
UrlResponseFormat::Raw => {
if body.first() == Some(&b'{') || body.first() == Some(&b'[') {
canonicalize_json(body).or_else(|_| Ok(body.to_vec()))
} else {
Ok(body.to_vec())
}
}
UrlResponseFormat::JsonCanonical => canonicalize_json(body),
UrlResponseFormat::PriceUsd => canonicalize_price_usd(body),
}
}
fn canonicalize_price_usd(body: &[u8]) -> Result<Vec<u8>, String> {
let price = parse_price_usd_micros(body).ok_or("price_usd not found")?;
let value: serde_json::Value = serde_json::from_slice(body).unwrap_or(serde_json::Value::Null);
let pair = value
.get("symbol")
.or_else(|| value.get("pair"))
.or_else(|| value.get("market"))
.and_then(|v| v.as_str())
.unwrap_or("BTC-USD");
serde_json::to_vec(&serde_json::json!({
"kind": "price_usd_sample",
"pair": pair,
"price_usd_micros": price
}))
.map_err(|e| format!("json encode error: {}", e))
}
fn parse_price_usd_micros(body: &[u8]) -> Option<u64> {
let value: serde_json::Value = serde_json::from_slice(body).ok()?;
let candidates = [
"price_usd_micros",
"price_usd_cents",
"price_usd",
"price",
"last",
"rate",
"amount",
"data.price",
"data.amount",
"result.price",
];
for path in candidates {
if let Some(v) = json_path(&value, path) {
let multiplier = match path {
"price_usd_micros" => 1.0,
"price_usd_cents" => 10_000.0,
_ => 1_000_000.0,
};
if let Some(n) = json_number(v) {
if n.is_finite() && n > 0.0 {
return Some((n * multiplier).round() as u64);
}
}
}
}
None
}
fn json_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
let mut cur = value;
for part in path.split('.') {
cur = cur.get(part)?;
}
Some(cur)
}
fn json_number(value: &serde_json::Value) -> Option<f64> {
match value {
serde_json::Value::Number(n) => n.as_f64(),
serde_json::Value::String(s) => s.replace(',', "").parse::<f64>().ok(),
_ => None,
}
}
fn canonicalize_json(body: &[u8]) -> Result<Vec<u8>, String> {
let value: serde_json::Value =
serde_json::from_slice(body).map_err(|e| format!("invalid json: {}", e))?;
let normalized = normalize_json(value);
serde_json::to_vec(&normalized).map_err(|e| format!("json encode error: {}", e))
}
const NON_DETERMINISTIC_FIELDS: &[&str] = &[
"timestamp",
"ts",
"time",
"date",
"datetime",
"created_at",
"updated_at",
"request_id",
"requestId",
"req_id",
"reqId",
"trace_id",
"traceId",
"nonce",
"random",
"seed",
"session_id",
"sessionId",
];
fn normalize_json(value: serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::Object(map) => {
let mut keys: Vec<String> = map
.keys()
.filter(|k| !NON_DETERMINISTIC_FIELDS.contains(&k.as_str()))
.cloned()
.collect();
keys.sort();
let mut new_map = serde_json::Map::new();
for key in keys {
if let Some(v) = map.get(&key) {
new_map.insert(key, normalize_json(v.clone()));
}
}
serde_json::Value::Object(new_map)
}
serde_json::Value::Array(items) => {
serde_json::Value::Array(items.into_iter().map(normalize_json).collect())
}
other => other,
}
}
fn project_by_schema(
schema_id: [u8; 32],
canonical_body: &[u8],
schema_registry: &im::HashMap<[u8; 32], SchemaEntry>,
) -> Result<Vec<u8>, String> {
let entry = match schema_registry.get(&schema_id) {
Some(e) if e.approved => e,
_ => return Err("schema not approved".into()),
};
let value: serde_json::Value =
serde_json::from_slice(canonical_body).map_err(|e| format!("invalid json: {}", e))?;
let obj = match value.as_object() {
Some(o) => o,
None => return Err("schema expects object at root".into()),
};
let mut projected = serde_json::Map::new();
let mut sorted_keys = entry.keys.clone();
sorted_keys.sort();
for key in &sorted_keys {
let val = resolve_path(obj, key);
match val {
Some(v) => {
projected.insert(key.clone(), v.clone());
}
None => return Err(format!("schema key {} not found in response", key)),
}
}
serde_json::to_vec(&serde_json::Value::Object(projected))
.map_err(|e| format!("json encode error: {}", e))
}
fn resolve_path<'a>(
obj: &'a serde_json::Map<String, serde_json::Value>,
path: &str,
) -> Option<&'a serde_json::Value> {
let mut parts = path.splitn(2, '.');
let key = parts.next()?;
let val = obj.get(key)?;
match parts.next() {
None => Some(val),
Some(rest) => val.as_object().and_then(|o| resolve_path(o, rest)),
}
}
pub fn url_response_format(
url: &str,
visibility: CellVisibility,
url_proposals: &im::HashMap<String, UrlProposal>,
) -> UrlResponseFormat {
match visibility {
CellVisibility::Private => UrlResponseFormat::Raw,
CellVisibility::Public => url_proposals
.values()
.find(|p| p.approved && url_matches_pattern(url, &p.url_pattern))
.map(|p| p.response_format)
.unwrap_or(UrlResponseFormat::Raw),
}
}
pub fn url_schema_id(
url: &str,
visibility: CellVisibility,
url_proposals: &im::HashMap<String, UrlProposal>,
) -> Option<[u8; 32]> {
match visibility {
CellVisibility::Private => None,
CellVisibility::Public => url_proposals
.values()
.find(|p| p.approved && url_matches_pattern(url, &p.url_pattern))
.and_then(|p| p.schema_id),
}
}
fn strip_accord_query_params(url: &str) -> String {
let Some((base, query)) = url.split_once('?') else {
return url.to_string();
};
let kept: Vec<&str> = query
.split('&')
.filter(|part| !part.starts_with("accord_format="))
.filter(|part| !part.is_empty())
.collect();
if kept.is_empty() {
base.to_string()
} else {
format!("{}?{}", base, kept.join("&"))
}
}
async fn execute_http_fetch(
client: &reqwest::Client,
req: &OracleRequest,
) -> Result<(Vec<u8>, u16), String> {
let fetch_url = strip_accord_query_params(&req.url);
let builder = match req.method.as_str() {
"GET" => client.get(&fetch_url),
"POST" => client.post(&fetch_url).body(req.body.clone()),
"PUT" => client.put(&fetch_url).body(req.body.clone()),
"DELETE" => client.delete(&fetch_url),
_ => return Err(format!("Unsupported method: {}", req.method)),
};
let response = builder
.send()
.await
.map_err(|e| format!("HTTP error: {}", e))?;
let status = response.status().as_u16();
if let Some(len) = response.content_length() {
if len as usize > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
return Err("Response too large".to_string());
}
}
let body = response
.bytes()
.await
.map_err(|e| format!("Body read error: {}", e))?
.to_vec();
if body.len() > gp::get_usize(gp::PARAM_MAX_RESPONSE_BYTES) {
return Err("Response too large".to_string());
}
Ok((body, status))
}
pub mod storage_keys {
pub fn oracle_request(req_id: &[u8; 32]) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"oracle:req:");
h.update(req_id);
(*h.finalize().as_bytes()).into()
}
pub fn oracle_tally(req_id: &[u8; 32]) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"oracle:tally:");
h.update(req_id);
(*h.finalize().as_bytes()).into()
}
pub fn oracle_result(req_id: &[u8; 32]) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"oracle:result:");
h.update(req_id);
(*h.finalize().as_bytes()).into()
}
pub fn url_proposal(pattern: &str) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"url:proposal:");
h.update(pattern.as_bytes());
(*h.finalize().as_bytes()).into()
}
pub fn cell_visibility(cell_id: &[u8; 32]) -> [u8; 32] {
let mut h = blake3::Hasher::new();
h.update(b"cell:vis:");
h.update(cell_id);
(*h.finalize().as_bytes()).into()
}
}