use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, NaiveDateTime, Utc};
use reqwest::{Client, StatusCode};
use serde::Deserialize;
use tokio::time::sleep;
use crate::api_clients::SimpleEmbedder;
use crate::ruvector_native::{Domain, SemanticVector};
use crate::{FrameworkError, Result};
const USGS_RATE_LIMIT_MS: u64 = 200; const CERN_RATE_LIMIT_MS: u64 = 500; const ARGO_RATE_LIMIT_MS: u64 = 300; const MATERIALS_PROJECT_RATE_LIMIT_MS: u64 = 1000; const MAX_RETRIES: u32 = 3;
const RETRY_DELAY_MS: u64 = 1000;
pub struct GeoUtils;
impl GeoUtils {
pub fn distance_km(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
let r = 6371.0; let dlat = (lat2 - lat1).to_radians();
let dlon = (lon2 - lon1).to_radians();
let a = (dlat / 2.0).sin().powi(2)
+ lat1.to_radians().cos() * lat2.to_radians().cos() * (dlon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt());
r * c
}
pub fn within_radius(
center_lat: f64,
center_lon: f64,
point_lat: f64,
point_lon: f64,
radius_km: f64,
) -> bool {
Self::distance_km(center_lat, center_lon, point_lat, point_lon) <= radius_km
}
}
#[derive(Debug, Deserialize)]
struct UsgsGeoJsonResponse {
#[serde(default)]
features: Vec<UsgsEarthquakeFeature>,
#[serde(default)]
metadata: UsgsMetadata,
}
#[derive(Debug, Deserialize, Default)]
struct UsgsMetadata {
#[serde(default)]
count: u32,
}
#[derive(Debug, Deserialize)]
struct UsgsEarthquakeFeature {
id: String,
properties: UsgsProperties,
geometry: UsgsGeometry,
}
#[derive(Debug, Deserialize)]
struct UsgsProperties {
#[serde(default)]
mag: Option<f64>,
#[serde(default)]
place: String,
#[serde(default)]
time: i64, #[serde(default)]
updated: i64,
#[serde(default)]
tz: Option<i32>,
#[serde(default)]
url: String,
#[serde(default)]
detail: String,
#[serde(default)]
felt: Option<u32>,
#[serde(default)]
cdi: Option<f64>, #[serde(default)]
mmi: Option<f64>, #[serde(default)]
alert: Option<String>,
#[serde(default)]
status: String,
#[serde(default)]
tsunami: u8,
#[serde(default)]
sig: u32, #[serde(default)]
net: String,
#[serde(default)]
code: String,
#[serde(default)]
r#type: String,
#[serde(default)]
title: String,
}
#[derive(Debug, Deserialize)]
struct UsgsGeometry {
coordinates: Vec<f64>, }
pub struct UsgsEarthquakeClient {
client: Client,
base_url: String,
rate_limit_delay: Duration,
embedder: Arc<SimpleEmbedder>,
}
impl UsgsEarthquakeClient {
pub fn new() -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(FrameworkError::Network)?;
Ok(Self {
client,
base_url: "https://earthquake.usgs.gov/fdsnws/event/1".to_string(),
rate_limit_delay: Duration::from_millis(USGS_RATE_LIMIT_MS),
embedder: Arc::new(SimpleEmbedder::new(256)),
})
}
pub async fn get_recent(
&self,
min_magnitude: f64,
days: u32,
) -> Result<Vec<SemanticVector>> {
let now = Utc::now();
let start_time = now - chrono::Duration::days(days as i64);
let url = format!(
"{}/query?format=geojson&starttime={}&endtime={}&minmagnitude={}",
self.base_url,
start_time.format("%Y-%m-%d"),
now.format("%Y-%m-%d"),
min_magnitude
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let geojson: UsgsGeoJsonResponse = response.json().await?;
self.convert_earthquakes(geojson.features)
}
pub async fn search_by_region(
&self,
lat: f64,
lon: f64,
radius_km: f64,
days: u32,
) -> Result<Vec<SemanticVector>> {
let now = Utc::now();
let start_time = now - chrono::Duration::days(days as i64);
let url = format!(
"{}/query?format=geojson&starttime={}&endtime={}&latitude={}&longitude={}&maxradiuskm={}",
self.base_url,
start_time.format("%Y-%m-%d"),
now.format("%Y-%m-%d"),
lat,
lon,
radius_km
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let geojson: UsgsGeoJsonResponse = response.json().await?;
self.convert_earthquakes(geojson.features)
}
pub async fn get_significant(&self, days: u32) -> Result<Vec<SemanticVector>> {
let now = Utc::now();
let start_time = now - chrono::Duration::days(days as i64);
let url = format!(
"{}/query?format=geojson&starttime={}&endtime={}&orderby=magnitude&limit=100",
self.base_url,
start_time.format("%Y-%m-%d"),
now.format("%Y-%m-%d")
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let geojson: UsgsGeoJsonResponse = response.json().await?;
let significant: Vec<_> = geojson
.features
.into_iter()
.filter(|f| {
f.properties.mag.unwrap_or(0.0) >= 6.0 || f.properties.sig >= 600
})
.collect();
self.convert_earthquakes(significant)
}
pub async fn get_by_magnitude_range(
&self,
min: f64,
max: f64,
days: u32,
) -> Result<Vec<SemanticVector>> {
let now = Utc::now();
let start_time = now - chrono::Duration::days(days as i64);
let url = format!(
"{}/query?format=geojson&starttime={}&endtime={}&minmagnitude={}&maxmagnitude={}",
self.base_url,
start_time.format("%Y-%m-%d"),
now.format("%Y-%m-%d"),
min,
max
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let geojson: UsgsGeoJsonResponse = response.json().await?;
self.convert_earthquakes(geojson.features)
}
fn convert_earthquakes(&self, features: Vec<UsgsEarthquakeFeature>) -> Result<Vec<SemanticVector>> {
let mut vectors = Vec::new();
for feature in features {
let mag = feature.properties.mag.unwrap_or(0.0);
let coords = &feature.geometry.coordinates;
let lon = coords.get(0).copied().unwrap_or(0.0);
let lat = coords.get(1).copied().unwrap_or(0.0);
let depth = coords.get(2).copied().unwrap_or(0.0);
let timestamp = DateTime::from_timestamp_millis(feature.properties.time)
.unwrap_or_else(Utc::now);
let text = format!(
"Magnitude {} earthquake {} at depth {}km (lat: {}, lon: {})",
mag, feature.properties.place, depth, lat, lon
);
let embedding = self.embedder.embed_text(&text);
let mut metadata = HashMap::new();
metadata.insert("magnitude".to_string(), mag.to_string());
metadata.insert("place".to_string(), feature.properties.place);
metadata.insert("latitude".to_string(), lat.to_string());
metadata.insert("longitude".to_string(), lon.to_string());
metadata.insert("depth_km".to_string(), depth.to_string());
metadata.insert("tsunami".to_string(), feature.properties.tsunami.to_string());
metadata.insert("significance".to_string(), feature.properties.sig.to_string());
metadata.insert("status".to_string(), feature.properties.status);
if let Some(alert) = feature.properties.alert {
metadata.insert("alert".to_string(), alert);
}
metadata.insert("source".to_string(), "usgs".to_string());
vectors.push(SemanticVector {
id: format!("USGS:{}", feature.id),
embedding,
domain: Domain::Seismic,
timestamp,
metadata,
});
}
Ok(vectors)
}
async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
let mut retries = 0;
loop {
match self.client.get(url).send().await {
Ok(response) => {
if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES {
retries += 1;
sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
continue;
}
return Ok(response);
}
Err(_) if retries < MAX_RETRIES => {
retries += 1;
sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
}
Err(e) => return Err(FrameworkError::Network(e)),
}
}
}
}
impl Default for UsgsEarthquakeClient {
fn default() -> Self {
Self::new().expect("Failed to create USGS client")
}
}
#[derive(Debug, Deserialize)]
struct CernRecord {
id: u64,
#[serde(default)]
metadata: CernMetadata,
}
#[derive(Debug, Deserialize, Default)]
struct CernMetadata {
#[serde(default)]
titles: Vec<CernTitle>,
#[serde(default)]
r#abstract: Option<CernAbstract>,
#[serde(default)]
experiment: Option<String>,
#[serde(default)]
collision_information: Option<CernCollisionInfo>,
#[serde(default)]
date_created: Vec<String>,
#[serde(default)]
keywords: Vec<String>,
#[serde(default)]
r#type: CernType,
}
#[derive(Debug, Deserialize)]
struct CernTitle {
title: String,
}
#[derive(Debug, Deserialize)]
struct CernAbstract {
description: String,
}
#[derive(Debug, Deserialize)]
struct CernCollisionInfo {
#[serde(default)]
energy: String,
#[serde(default)]
r#type: String,
}
#[derive(Debug, Deserialize, Default)]
struct CernType {
#[serde(default)]
primary: String,
#[serde(default)]
secondary: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct CernSearchResponse {
#[serde(default)]
hits: CernHits,
}
#[derive(Debug, Deserialize, Default)]
struct CernHits {
#[serde(default)]
hits: Vec<CernRecord>,
#[serde(default)]
total: u32,
}
pub struct CernOpenDataClient {
client: Client,
base_url: String,
rate_limit_delay: Duration,
embedder: Arc<SimpleEmbedder>,
}
impl CernOpenDataClient {
pub fn new() -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(FrameworkError::Network)?;
Ok(Self {
client,
base_url: "https://opendata.cern.ch/api/records".to_string(),
rate_limit_delay: Duration::from_millis(CERN_RATE_LIMIT_MS),
embedder: Arc::new(SimpleEmbedder::new(256)),
})
}
pub async fn search_datasets(&self, query: &str) -> Result<Vec<SemanticVector>> {
let url = format!(
"{}?q={}&size=50",
self.base_url,
urlencoding::encode(query)
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let search_response: CernSearchResponse = response.json().await?;
self.convert_records(search_response.hits.hits)
}
pub async fn get_dataset(&self, recid: u64) -> Result<Vec<SemanticVector>> {
let url = format!("{}/{}", self.base_url, recid);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let record: CernRecord = response.json().await?;
self.convert_records(vec![record])
}
pub async fn search_by_experiment(&self, experiment: &str) -> Result<Vec<SemanticVector>> {
let url = format!(
"{}?experiment={}&size=50",
self.base_url,
urlencoding::encode(experiment)
);
sleep(self.rate_limit_delay).await;
let response = self.fetch_with_retry(&url).await?;
let search_response: CernSearchResponse = response.json().await?;
self.convert_records(search_response.hits.hits)
}
fn convert_records(&self, records: Vec<CernRecord>) -> Result<Vec<SemanticVector>> {
let mut vectors = Vec::new();
for record in records {
let title = record
.metadata
.titles
.first()
.map(|t| t.title.clone())
.unwrap_or_else(|| format!("Dataset {}", record.id));
let description = record
.metadata
.r#abstract
.as_ref()
.map(|a| a.description.clone())
.unwrap_or_default();
let experiment = record.metadata.experiment.unwrap_or_default();
let collision_energy = record
.metadata
.collision_information
.as_ref()
.map(|c| c.energy.clone())
.unwrap_or_default();
let collision_type = record
.metadata
.collision_information
.as_ref()
.map(|c| c.r#type.clone())
.unwrap_or_default();
let text = format!(
"{} {} {} {} {}",
title,
description,
experiment,
collision_energy,
collision_type
);
let embedding = self.embedder.embed_text(&text);
let mut metadata = HashMap::new();
metadata.insert("recid".to_string(), record.id.to_string());
metadata.insert("title".to_string(), title);
metadata.insert("experiment".to_string(), experiment);
metadata.insert("collision_energy".to_string(), collision_energy);
metadata.insert("collision_type".to_string(), collision_type);
metadata.insert("data_type".to_string(), record.metadata.r#type.primary);
metadata.insert("source".to_string(), "cern".to_string());
let date = record
.metadata
.date_created
.first()
.and_then(|d| NaiveDateTime::parse_from_str(d, "%Y-%m-%d %H:%M:%S").ok())
.or_else(|| {
record
.metadata
.date_created
.first()
.and_then(|d| NaiveDateTime::parse_from_str(d, "%Y").ok())
})
.map(|dt| dt.and_utc())
.unwrap_or_else(Utc::now);
vectors.push(SemanticVector {
id: format!("CERN:{}", record.id),
embedding,
domain: Domain::Physics,
timestamp: date,
metadata,
});
}
Ok(vectors)
}
async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
let mut retries = 0;
loop {
match self.client.get(url).send().await {
Ok(response) => {
if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES {
retries += 1;
sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
continue;
}
return Ok(response);
}
Err(_) if retries < MAX_RETRIES => {
retries += 1;
sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
}
Err(e) => return Err(FrameworkError::Network(e)),
}
}
}
}
impl Default for CernOpenDataClient {
fn default() -> Self {
Self::new().expect("Failed to create CERN client")
}
}
#[derive(Debug, Deserialize)]
struct ArgoProfile {
#[serde(default)]
platform_number: String,
#[serde(default)]
cycle_number: u32,
#[serde(default)]
latitude: f64,
#[serde(default)]
longitude: f64,
#[serde(default)]
juld: f64, #[serde(default)]
pres: Vec<f64>, #[serde(default)]
temp: Vec<f64>, #[serde(default)]
psal: Vec<f64>, }
pub struct ArgoClient {
client: Client,
base_url: String,
rate_limit_delay: Duration,
embedder: Arc<SimpleEmbedder>,
}
impl ArgoClient {
pub fn new() -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(FrameworkError::Network)?;
Ok(Self {
client,
base_url: "https://data-argo.ifremer.fr".to_string(),
rate_limit_delay: Duration::from_millis(ARGO_RATE_LIMIT_MS),
embedder: Arc::new(SimpleEmbedder::new(256)),
})
}
pub async fn get_recent_profiles(&self, _days: u32) -> Result<Vec<SemanticVector>> {
Ok(Vec::new())
}
pub async fn search_by_region(
&self,
_lat: f64,
_lon: f64,
_radius_km: f64,
) -> Result<Vec<SemanticVector>> {
Ok(Vec::new())
}
pub async fn get_temperature_profiles(&self) -> Result<Vec<SemanticVector>> {
Ok(Vec::new())
}
pub fn create_sample_profiles(&self, count: usize) -> Result<Vec<SemanticVector>> {
let mut vectors = Vec::new();
for i in 0..count {
let lat = -60.0 + (120.0 * (i as f64 / count as f64));
let lon = -180.0 + (360.0 * ((i * 7) % count) as f64 / count as f64);
let temp = 5.0 + (15.0 * (lat.abs() / 90.0));
let salinity = 34.0 + (2.0 * (lat / 90.0));
let depth = 100.0 * (i % 20) as f64;
let text = format!(
"Ocean profile at lat {} lon {}: temp {}°C, salinity {}, depth {}m",
lat, lon, temp, salinity, depth
);
let embedding = self.embedder.embed_text(&text);
let mut metadata = HashMap::new();
metadata.insert("platform_number".to_string(), format!("{}", 1900000 + i));
metadata.insert("latitude".to_string(), lat.to_string());
metadata.insert("longitude".to_string(), lon.to_string());
metadata.insert("temperature".to_string(), temp.to_string());
metadata.insert("salinity".to_string(), salinity.to_string());
metadata.insert("depth_m".to_string(), depth.to_string());
metadata.insert("source".to_string(), "argo".to_string());
vectors.push(SemanticVector {
id: format!("ARGO:{}", 1900000 + i),
embedding,
domain: Domain::Ocean,
timestamp: Utc::now() - chrono::Duration::days(i as i64 % 30),
metadata,
});
}
Ok(vectors)
}
}
impl Default for ArgoClient {
fn default() -> Self {
Self::new().expect("Failed to create Argo client")
}
}
#[derive(Debug, Deserialize)]
struct MaterialsProjectMaterial {
material_id: String,
#[serde(default)]
formula_pretty: String,
#[serde(default)]
band_gap: Option<f64>,
#[serde(default)]
density: Option<f64>,
#[serde(default)]
formation_energy_per_atom: Option<f64>,
#[serde(default)]
energy_per_atom: Option<f64>,
#[serde(default)]
volume: Option<f64>,
#[serde(default)]
nsites: Option<u32>,
#[serde(default)]
elements: Vec<String>,
#[serde(default)]
nelements: Option<u32>,
#[serde(default)]
crystal_system: Option<String>,
#[serde(default)]
symmetry: Option<MaterialsSymmetry>,
}
#[derive(Debug, Deserialize)]
struct MaterialsSymmetry {
#[serde(default)]
crystal_system: String,
#[serde(default)]
symbol: String,
}
#[derive(Debug, Deserialize)]
struct MaterialsProjectResponse {
#[serde(default)]
data: Vec<MaterialsProjectMaterial>,
}
pub struct MaterialsProjectClient {
client: Client,
base_url: String,
api_key: String,
rate_limit_delay: Duration,
embedder: Arc<SimpleEmbedder>,
}
impl MaterialsProjectClient {
pub fn new(api_key: String) -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(FrameworkError::Network)?;
Ok(Self {
client,
base_url: "https://api.materialsproject.org".to_string(),
api_key,
rate_limit_delay: Duration::from_millis(MATERIALS_PROJECT_RATE_LIMIT_MS),
embedder: Arc::new(SimpleEmbedder::new(256)),
})
}
pub async fn search_materials(&self, formula: &str) -> Result<Vec<SemanticVector>> {
let url = format!(
"{}/materials/summary/?formula={}",
self.base_url,
urlencoding::encode(formula)
);
sleep(self.rate_limit_delay).await;
let response = self.client
.get(&url)
.header("X-API-KEY", &self.api_key)
.send()
.await?;
let mp_response: MaterialsProjectResponse = response.json().await?;
self.convert_materials(mp_response.data)
}
pub async fn get_material(&self, material_id: &str) -> Result<Vec<SemanticVector>> {
let url = format!(
"{}/materials/{}/",
self.base_url, material_id
);
sleep(self.rate_limit_delay).await;
let response = self.client
.get(&url)
.header("X-API-KEY", &self.api_key)
.send()
.await?;
let material: MaterialsProjectMaterial = response.json().await?;
self.convert_materials(vec![material])
}
pub async fn search_by_property(
&self,
property: &str,
min: f64,
max: f64,
) -> Result<Vec<SemanticVector>> {
let url = format!(
"{}/materials/summary/?{}_min={}&{}_max={}",
self.base_url, property, min, property, max
);
sleep(self.rate_limit_delay).await;
let response = self.client
.get(&url)
.header("X-API-KEY", &self.api_key)
.send()
.await?;
let mp_response: MaterialsProjectResponse = response.json().await?;
self.convert_materials(mp_response.data)
}
fn convert_materials(&self, materials: Vec<MaterialsProjectMaterial>) -> Result<Vec<SemanticVector>> {
let mut vectors = Vec::new();
for material in materials {
let band_gap = material.band_gap.unwrap_or(0.0);
let density = material.density.unwrap_or(0.0);
let formation_energy = material.formation_energy_per_atom.unwrap_or(0.0);
let crystal_system = material
.crystal_system
.or_else(|| material.symmetry.as_ref().map(|s| s.crystal_system.clone()))
.unwrap_or_default();
let text = format!(
"{} {} crystal system, band gap {} eV, density {} g/cm³, formation energy {} eV/atom",
material.formula_pretty, crystal_system, band_gap, density, formation_energy
);
let embedding = self.embedder.embed_text(&text);
let mut metadata = HashMap::new();
metadata.insert("material_id".to_string(), material.material_id.clone());
metadata.insert("formula".to_string(), material.formula_pretty);
metadata.insert("band_gap".to_string(), band_gap.to_string());
metadata.insert("density".to_string(), density.to_string());
metadata.insert("formation_energy".to_string(), formation_energy.to_string());
metadata.insert("crystal_system".to_string(), crystal_system);
metadata.insert("elements".to_string(), material.elements.join(","));
metadata.insert("source".to_string(), "materials_project".to_string());
vectors.push(SemanticVector {
id: format!("MP:{}", material.material_id),
embedding,
domain: Domain::Physics,
timestamp: Utc::now(),
metadata,
});
}
Ok(vectors)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_geo_utils_distance() {
let dist = GeoUtils::distance_km(40.7128, -74.0060, 34.0522, -118.2437);
assert!((dist - 3936.0).abs() < 100.0); }
#[test]
fn test_geo_utils_within_radius() {
let center_lat = 34.05;
let center_lon = -118.25;
let nearby = GeoUtils::within_radius(center_lat, center_lon, 34.5, -118.25, 100.0);
assert!(nearby);
let far = GeoUtils::within_radius(center_lat, center_lon, 40.7, -74.0, 10.0);
assert!(!far);
}
#[tokio::test]
async fn test_usgs_client_creation() {
let client = UsgsEarthquakeClient::new();
assert!(client.is_ok());
}
#[tokio::test]
async fn test_cern_client_creation() {
let client = CernOpenDataClient::new();
assert!(client.is_ok());
}
#[tokio::test]
async fn test_argo_client_creation() {
let client = ArgoClient::new();
assert!(client.is_ok());
}
#[tokio::test]
async fn test_materials_project_client_creation() {
let client = MaterialsProjectClient::new("test_key".to_string());
assert!(client.is_ok());
}
#[tokio::test]
async fn test_argo_sample_profiles() {
let client = ArgoClient::new().unwrap();
let profiles = client.create_sample_profiles(10);
assert!(profiles.is_ok());
let vectors = profiles.unwrap();
assert_eq!(vectors.len(), 10);
assert_eq!(vectors[0].domain, Domain::Ocean);
}
#[test]
fn test_rate_limiting() {
let usgs = UsgsEarthquakeClient::new().unwrap();
assert_eq!(usgs.rate_limit_delay, Duration::from_millis(USGS_RATE_LIMIT_MS));
let cern = CernOpenDataClient::new().unwrap();
assert_eq!(cern.rate_limit_delay, Duration::from_millis(CERN_RATE_LIMIT_MS));
let argo = ArgoClient::new().unwrap();
assert_eq!(argo.rate_limit_delay, Duration::from_millis(ARGO_RATE_LIMIT_MS));
let mp = MaterialsProjectClient::new("test".to_string()).unwrap();
assert_eq!(mp.rate_limit_delay, Duration::from_millis(MATERIALS_PROJECT_RATE_LIMIT_MS));
}
}