use crate::runtime::values::Value;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
#[derive(Debug, Clone)]
pub struct StreamEntry {
pub source: String,
pub created_at: u64,
}
fn get_stream_registry() -> std::sync::MutexGuard<'static, HashMap<String, StreamEntry>> {
static REG: OnceLock<Mutex<HashMap<String, StreamEntry>>> = OnceLock::new();
REG.get_or_init(|| Mutex::new(HashMap::new()));
REG.get().unwrap().lock().unwrap()
}
#[cfg(feature = "http-interface")]
fn json_to_value(j: &serde_json::Value) -> Value {
match j {
serde_json::Value::Null => Value::Null,
serde_json::Value::Bool(b) => Value::Bool(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::Int(i)
} else if let Some(f) = n.as_f64() {
Value::Float(f)
} else {
Value::String(n.to_string())
}
}
serde_json::Value::String(s) => Value::String(s.clone()),
serde_json::Value::Array(arr) => Value::List(arr.iter().map(json_to_value).collect()),
serde_json::Value::Object(obj) => {
let m: HashMap<String, Value> = obj
.iter()
.map(|(k, v)| (k.clone(), json_to_value(v)))
.collect();
Value::Map(m)
}
}
}
#[derive(Debug, Clone)]
pub struct OracleSource {
pub name: String,
pub url: String,
pub api_key: Option<String>,
pub rate_limit: Option<i64>,
pub trusted: bool, pub public_key: Option<String>, pub last_request_time: Option<u64>, }
#[derive(Debug, Clone)]
pub struct OracleQuery {
pub query_type: String,
pub parameters: HashMap<String, Value>,
pub timeout: Option<i64>,
pub require_signature: bool, pub min_confirmations: Option<u32>, }
#[derive(Debug, Clone)]
pub struct OracleResponse {
pub data: Value,
pub timestamp: u64, pub source: String,
pub signature: Option<String>,
pub verified: bool, pub confidence_score: f64, }
#[derive(Debug, Clone)]
pub struct OracleSecurityManager {
trusted_sources: HashMap<String, OracleSource>,
response_cache: HashMap<String, (OracleResponse, u64)>, max_age_seconds: u64, }
#[derive(Debug, Clone)]
pub struct OracleConsensus {
pub sources: Vec<String>,
pub responses: Vec<OracleResponse>,
pub consensus_threshold: f64, }
impl OracleSource {
pub fn new(name: String, url: String) -> Self {
Self {
name,
url,
api_key: None,
rate_limit: None,
trusted: false, public_key: None,
last_request_time: None,
}
}
pub fn with_api_key(mut self, api_key: String) -> Self {
self.api_key = Some(api_key);
self
}
pub fn with_rate_limit(mut self, rate_limit: i64) -> Self {
self.rate_limit = Some(rate_limit);
self
}
pub fn with_trust(mut self, public_key: String) -> Self {
self.trusted = true;
self.public_key = Some(public_key);
self
}
pub fn can_request(&mut self) -> bool {
let now = get_current_timestamp();
if let Some(rate_limit) = self.rate_limit {
if let Some(last_time) = self.last_request_time {
let min_interval = 1000 / rate_limit as u64; if now - last_time < min_interval {
return false; }
}
}
self.last_request_time = Some(now);
true
}
}
impl OracleQuery {
pub fn new(query_type: String) -> Self {
Self {
query_type,
parameters: HashMap::new(),
timeout: None,
require_signature: true, min_confirmations: None,
}
}
pub fn with_parameter(mut self, key: String, value: Value) -> Self {
self.parameters.insert(key, value);
self
}
pub fn with_timeout(mut self, timeout: i64) -> Self {
self.timeout = Some(timeout);
self
}
pub fn require_signature(mut self, required: bool) -> Self {
self.require_signature = required;
self
}
pub fn with_confirmations(mut self, count: u32) -> Self {
self.min_confirmations = Some(count);
self
}
}
impl OracleSecurityManager {
pub fn new() -> Self {
Self {
trusted_sources: HashMap::new(),
response_cache: HashMap::new(),
max_age_seconds: 300, }
}
pub fn register_trusted_source(&mut self, source: OracleSource) {
if source.trusted && source.public_key.is_some() {
self.trusted_sources.insert(source.name.clone(), source);
}
}
pub fn verify_response(&self, response: &OracleResponse) -> bool {
if let Some(signature) = &response.signature {
if let Some(source) = self.trusted_sources.get(&response.source) {
if let Some(public_key) = &source.public_key {
return verify_signature(&response.data, signature, public_key);
}
}
}
false
}
pub fn validate_timestamp(&self, timestamp: u64) -> bool {
let now = get_current_timestamp();
let age = now.saturating_sub(timestamp);
age <= self.max_age_seconds * 1000 }
pub fn get_cached(&self, cache_key: &str) -> Option<OracleResponse> {
if let Some((response, cached_time)) = self.response_cache.get(cache_key) {
if self.validate_timestamp(*cached_time) {
return Some(response.clone());
}
}
None
}
}
impl Default for OracleSecurityManager {
fn default() -> Self {
Self::new()
}
}
impl OracleConsensus {
pub fn new(sources: Vec<String>, threshold: f64) -> Self {
Self {
sources,
responses: Vec::new(),
consensus_threshold: threshold,
}
}
pub fn add_response(&mut self, response: OracleResponse) {
self.responses.push(response);
}
pub fn get_consensus(&self) -> Option<OracleResponse> {
if self.responses.is_empty() {
return None;
}
let mut value_counts: HashMap<String, (usize, OracleResponse)> = HashMap::new();
for response in &self.responses {
let key = format!("{:?}", response.data); value_counts
.entry(key)
.and_modify(|(count, _)| *count += 1)
.or_insert((1, response.clone()));
}
let total = self.responses.len();
for (count, response) in value_counts.values() {
let agreement = *count as f64 / total as f64;
if agreement >= self.consensus_threshold {
let mut consensus_response = response.clone();
consensus_response.confidence_score = agreement;
return Some(consensus_response);
}
}
None
}
}
pub fn fetch(source: &str, query: OracleQuery) -> Result<OracleResponse, String> {
let timestamp = get_current_timestamp();
#[cfg(feature = "http-interface")]
if source.starts_with("http://") || source.starts_with("https://") {
let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(
query.timeout.unwrap_or(30) as u64
))
.build()
.map_err(|e| e.to_string())?;
let resp = client.get(source).send().map_err(|e| e.to_string())?;
if !resp.status().is_success() {
return Err(format!("Oracle fetch failed: {}", resp.status()));
}
let signature = resp
.headers()
.get("X-Signature")
.and_then(|v| v.to_str().ok())
.map(String::from);
let body: serde_json::Value = resp.json().map_err(|e| e.to_string())?;
let data = json_to_value(&body);
if query.require_signature && signature.is_none() {
return Err("Signature required but not provided".to_string());
}
return Ok(OracleResponse {
data,
timestamp,
source: source.to_string(),
signature,
verified: false,
confidence_score: 1.0,
});
}
#[cfg(feature = "http-interface")]
{
return Err(format!(
"Oracle source must be an HTTP/HTTPS URL. Got: '{}'. Use oracle::create_source() to register sources, or provide a URL directly.",
source
));
}
#[cfg(not(feature = "http-interface"))]
{
return Err(format!(
"Oracle fetching requires the 'http-interface' feature. Source: '{}'",
source
));
}
}
pub fn fetch_with_consensus(
sources: Vec<&str>,
query: OracleQuery,
threshold: f64,
) -> Result<OracleResponse, String> {
let mut consensus =
OracleConsensus::new(sources.iter().map(|s| s.to_string()).collect(), threshold);
for source in sources {
match fetch(source, query.clone()) {
Ok(response) => consensus.add_response(response),
Err(e) => eprintln!("Oracle source {} failed: {}", source, e),
}
}
consensus.get_consensus().ok_or_else(|| {
format!(
"Failed to reach consensus (threshold: {:.1}%)",
threshold * 100.0
)
})
}
pub fn verify(data: &Value, signature: &str) -> bool {
verify_signature(data, signature, "default_public_key")
}
pub fn stream(source: &str, _callback: &str) -> Result<String, String> {
let created_at = get_current_timestamp();
if source.starts_with("ws://") || source.starts_with("wss://") {
let stream_id = format!("stream_ws_{:x}", {
let mut h = Sha256::new();
h.update(source.as_bytes());
h.finalize()[0..8]
.iter()
.fold(0u64, |a, &b| (a << 8) | u64::from(b))
});
get_stream_registry().insert(
stream_id.clone(),
StreamEntry {
source: source.to_string(),
created_at,
},
);
return Ok(stream_id);
}
let stream_id = format!("stream_{:x}", {
let mut h = Sha256::new();
h.update(source.as_bytes());
h.update(created_at.to_be_bytes().as_slice());
h.finalize()[0..8]
.iter()
.fold(0u64, |a, &b| (a << 8) | u64::from(b))
});
get_stream_registry().insert(
stream_id.clone(),
StreamEntry {
source: source.to_string(),
created_at,
},
);
Ok(stream_id)
}
pub fn get_stream(stream_id: &str) -> Option<StreamEntry> {
get_stream_registry().get(stream_id).cloned()
}
pub fn close_stream(stream_id: &str) -> bool {
get_stream_registry().remove(stream_id).is_some()
}
pub fn create_source(name: String, url: String) -> OracleSource {
OracleSource::new(name, url)
}
pub fn create_query(query_type: String) -> OracleQuery {
OracleQuery::new(query_type)
}
fn get_current_timestamp() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
fn generate_signature(data: &Value, private_key: &str) -> String {
let data_string = format!("{:?}", data); let mut hasher = Sha256::new();
hasher.update(data_string.as_bytes());
hasher.update(private_key.as_bytes());
format!("{:x}", hasher.finalize())
}
fn verify_signature(data: &Value, signature: &str, public_key: &str) -> bool {
let expected_sig = generate_signature(data, public_key);
if signature.len() != expected_sig.len() {
return false;
}
let mut matches = true;
for (a, b) in signature.bytes().zip(expected_sig.bytes()) {
if a != b {
matches = false;
}
}
matches
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oracle_security_manager() {
let mut manager = OracleSecurityManager::new();
let source = OracleSource::new(
"test_source".to_string(),
"https://api.test.com".to_string(),
)
.with_trust("test_public_key".to_string());
manager.register_trusted_source(source);
assert!(manager.trusted_sources.contains_key("test_source"));
}
#[test]
fn test_timestamp_validation() {
let manager = OracleSecurityManager::new();
let now = get_current_timestamp();
assert!(manager.validate_timestamp(now));
assert!(manager.validate_timestamp(now - 1000)); assert!(!manager.validate_timestamp(now - 400000)); }
#[test]
fn test_signature_generation_and_verification() {
let data = Value::Int(12345);
let sig = generate_signature(&data, "test_key");
assert!(verify_signature(&data, &sig, "test_key"));
assert!(!verify_signature(&data, &sig, "wrong_key"));
assert!(!verify_signature(&data, "wrong_sig", "test_key"));
}
#[test]
#[cfg(feature = "http-interface")]
fn test_oracle_fetch_with_security() {
let query = OracleQuery::new("btc_price".to_string()).require_signature(true);
let result = fetch("price_feed", query.clone());
assert!(result.is_err());
assert!(result
.unwrap_err()
.contains("Oracle source must be an HTTP/HTTPS URL"));
let result = fetch("https://api.example.com/oracle", query);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(!err.contains("must be an HTTP/HTTPS URL"));
}
#[test]
#[cfg(not(feature = "http-interface"))]
fn test_oracle_fetch_requires_http_interface() {
let query = OracleQuery::new("btc_price".to_string());
let result = fetch("https://api.example.com/oracle", query);
assert!(result.is_err());
assert!(result
.unwrap_err()
.contains("requires the 'http-interface' feature"));
}
#[test]
fn test_rate_limiting() {
let mut source =
OracleSource::new("test".to_string(), "url".to_string()).with_rate_limit(10);
assert!(source.can_request());
let _ = source.can_request();
}
#[test]
fn test_consensus_validation() {
let mut consensus = OracleConsensus::new(
vec![
"source1".to_string(),
"source2".to_string(),
"source3".to_string(),
],
0.66, );
let response1 = OracleResponse {
data: Value::Int(100),
timestamp: get_current_timestamp(),
source: "source1".to_string(),
signature: Some("sig1".to_string()),
verified: true,
confidence_score: 1.0,
};
let response2 = OracleResponse {
data: Value::Int(100),
timestamp: get_current_timestamp(),
source: "source2".to_string(),
signature: Some("sig2".to_string()),
verified: true,
confidence_score: 1.0,
};
let response3 = OracleResponse {
data: Value::Int(99), timestamp: get_current_timestamp(),
source: "source3".to_string(),
signature: Some("sig3".to_string()),
verified: true,
confidence_score: 1.0,
};
consensus.add_response(response1);
consensus.add_response(response2);
consensus.add_response(response3);
let result = consensus.get_consensus();
assert!(result.is_some());
let consensus_response = result.unwrap();
assert!(matches!(consensus_response.data, Value::Int(100)));
assert!(consensus_response.confidence_score >= 0.66);
}
}