use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use parking_lot::RwLock;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use thiserror::Error;
use tracing::{debug, info, warn};
use crate::domain::error::{Result as DomainResult, ServiceError, StygianError};
use crate::ports::data_sink::{DataSinkError, DataSinkPort, SinkReceipt, SinkRecord};
use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
#[derive(Debug, Error)]
pub enum ScrapeExchangeError {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),
#[error("JSON parsing error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Authentication failed: {0}")]
AuthFailed(String),
#[error("Token refresh failed: {0}")]
TokenRefreshFailed(String),
#[error("Rate limited; retry after {retry_after_secs}s")]
RateLimited {
retry_after_secs: u64,
},
#[error("API error: {status} {message}")]
ApiError {
status: u16,
message: String,
},
#[error("Invalid configuration: {0}")]
InvalidConfig(String),
#[error("Health check failed: {0}")]
HealthCheckFailed(String),
}
#[derive(Debug, Clone)]
pub struct ScrapeExchangeConfig {
pub api_key_id: String,
pub api_key_secret: String,
pub base_url: String,
}
impl ScrapeExchangeConfig {
pub fn from_env() -> std::result::Result<Self, ScrapeExchangeError> {
let api_key_id = std::env::var("SCRAPE_EXCHANGE_KEY_ID").map_err(|_| {
ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_ID not set".to_string())
})?;
let api_key_secret = std::env::var("SCRAPE_EXCHANGE_KEY_SECRET").map_err(|_| {
ScrapeExchangeError::InvalidConfig("SCRAPE_EXCHANGE_KEY_SECRET not set".to_string())
})?;
let base_url = std::env::var("SCRAPE_EXCHANGE_BASE_URL")
.unwrap_or_else(|_| "https://scrape.exchange/api/".to_string());
Ok(Self {
api_key_id,
api_key_secret,
base_url,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(dead_code)]
struct JwtTokenResponse {
access_token: String,
token_type: String,
expires_in: u64,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct JwtToken {
access_token: String,
token_type: String,
expires_in: u64,
issued_at_secs: u64,
}
impl JwtToken {
fn from_response(response: JwtTokenResponse) -> Self {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
access_token: response.access_token,
token_type: response.token_type,
expires_in: response.expires_in,
issued_at_secs: now_secs,
}
}
fn is_expired(&self) -> bool {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let grace_period_secs = 30;
now_secs >= self.issued_at_secs + self.expires_in - grace_period_secs
}
}
pub struct ScrapeExchangeClient {
config: ScrapeExchangeConfig,
http_client: Client,
token: Arc<RwLock<Option<JwtToken>>>,
}
impl ScrapeExchangeClient {
pub async fn new(
config: ScrapeExchangeConfig,
) -> std::result::Result<Self, ScrapeExchangeError> {
if config.api_key_id.is_empty() || config.api_key_secret.is_empty() {
return Err(ScrapeExchangeError::InvalidConfig(
"api_key_id and api_key_secret must not be empty".to_string(),
));
}
let client = Client::new();
let instance = Self {
config,
http_client: client,
token: Arc::new(RwLock::new(None)),
};
instance.refresh_token().await?;
Ok(instance)
}
async fn refresh_token(&self) -> std::result::Result<(), ScrapeExchangeError> {
let auth_url = format!("{}account/v1/token", self.config.base_url);
debug!("Refreshing JWT token from {}", auth_url);
let response = self
.http_client
.post(&auth_url)
.json(&json!({
"api_key_id": self.config.api_key_id,
"api_key_secret": self.config.api_key_secret,
}))
.send()
.await?;
match response.status() {
StatusCode::OK => {
let token_response: JwtTokenResponse = response.json().await?;
let expires_in = token_response.expires_in;
let token = JwtToken::from_response(token_response);
*self.token.write() = Some(token);
debug!("JWT token refreshed; expires in {}s", expires_in);
Ok(())
}
StatusCode::UNAUTHORIZED => Err(ScrapeExchangeError::AuthFailed(
"Invalid API credentials".to_string(),
)),
StatusCode::TOO_MANY_REQUESTS => {
let retry_after = response
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
Err(ScrapeExchangeError::RateLimited {
retry_after_secs: retry_after,
})
}
status => {
let body = response.text().await.unwrap_or_default();
Err(ScrapeExchangeError::TokenRefreshFailed(format!(
"{status}: {body}"
)))
}
}
}
async fn get_token(&self) -> std::result::Result<String, ScrapeExchangeError> {
{
let token_lock = self.token.read();
if let Some(token) = token_lock.as_ref()
&& !token.is_expired()
{
return Ok(token.access_token.clone());
}
}
drop(self.token.read());
self.refresh_token().await?;
Ok(self
.token
.read()
.as_ref()
.ok_or_else(|| {
ScrapeExchangeError::TokenRefreshFailed("Token not set after refresh".to_string())
})?
.access_token
.clone())
}
pub async fn upload(&self, data: Value) -> std::result::Result<Value, ScrapeExchangeError> {
let token = self.get_token().await?;
let url = format!("{}data/v1/", self.config.base_url);
let mut retries = 0;
let max_retries = 3;
loop {
let response = self
.http_client
.post(&url)
.bearer_auth(&token)
.json(&data)
.send()
.await?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
let result = response.json().await?;
debug!("Data uploaded successfully");
return Ok(result);
}
StatusCode::TOO_MANY_REQUESTS => {
let retry_after = response
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(60);
if retries < max_retries {
retries += 1;
let backoff_ms = retry_after * 1000;
warn!(
"Rate limited; retrying in {}ms (attempt {}/{})",
backoff_ms, retries, max_retries
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
}
return Err(ScrapeExchangeError::RateLimited {
retry_after_secs: retry_after,
});
}
StatusCode::UNAUTHORIZED => {
if retries == 0 {
retries = 1;
self.refresh_token().await?;
continue;
}
return Err(ScrapeExchangeError::AuthFailed(
"Reauthorization failed".to_string(),
));
}
status => {
let body = response.text().await.unwrap_or_default();
return Err(ScrapeExchangeError::ApiError {
status: status.as_u16(),
message: body,
});
}
}
}
}
pub async fn query(
&self,
uploader: &str,
platform: &str,
entity: &str,
) -> std::result::Result<Value, ScrapeExchangeError> {
let token = self.get_token().await?;
let url = format!(
"{}data/v1/param/{}/{}/{}",
self.config.base_url, uploader, platform, entity
);
debug!("Querying {}", url);
let response = self
.http_client
.get(&url)
.bearer_auth(&token)
.send()
.await?;
match response.status() {
StatusCode::OK => {
let result = response.json().await?;
Ok(result)
}
StatusCode::UNAUTHORIZED => {
self.refresh_token().await?;
Err(ScrapeExchangeError::AuthFailed(
"Reauthorization required".to_string(),
))
}
StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
status: 404,
message: "Query parameters not found".to_string(),
}),
status => {
let body = response.text().await.unwrap_or_default();
Err(ScrapeExchangeError::ApiError {
status: status.as_u16(),
message: body,
})
}
}
}
pub async fn item_lookup(
&self,
item_id: &str,
) -> std::result::Result<Value, ScrapeExchangeError> {
let token = self.get_token().await?;
let url = format!("{}data/v1/item_id/{}", self.config.base_url, item_id);
debug!("Looking up item: {}", item_id);
let response = self
.http_client
.get(&url)
.bearer_auth(&token)
.send()
.await?;
match response.status() {
StatusCode::OK => {
let result = response.json().await?;
Ok(result)
}
StatusCode::UNAUTHORIZED => {
self.refresh_token().await?;
Err(ScrapeExchangeError::AuthFailed(
"Reauthorization required".to_string(),
))
}
StatusCode::NOT_FOUND => Err(ScrapeExchangeError::ApiError {
status: 404,
message: format!("Item not found: {item_id}"),
}),
status => {
let body = response.text().await.unwrap_or_default();
Err(ScrapeExchangeError::ApiError {
status: status.as_u16(),
message: body,
})
}
}
}
pub async fn health_check(&self) -> std::result::Result<(), ScrapeExchangeError> {
let url = format!("{}status", self.config.base_url);
debug!("Health check: {}", url);
let response = self.http_client.get(&url).send().await?;
match response.status() {
StatusCode::OK => {
info!("Scrape Exchange API is healthy");
Ok(())
}
status => {
let body = response.text().await.unwrap_or_default();
Err(ScrapeExchangeError::HealthCheckFailed(format!(
"{status}: {body}"
)))
}
}
}
}
pub struct ScrapeExchangeAdapter {
client: Arc<ScrapeExchangeClient>,
}
impl ScrapeExchangeAdapter {
pub async fn new(
config: ScrapeExchangeConfig,
) -> std::result::Result<Self, ScrapeExchangeError> {
let client = ScrapeExchangeClient::new(config).await?;
Ok(Self {
client: Arc::new(client),
})
}
pub fn client(&self) -> &ScrapeExchangeClient {
&self.client
}
fn map_record(record: &SinkRecord) -> Value {
json!({
"schema_id": record.schema_id,
"source": record.source_url,
"content": record.data,
"metadata": record.metadata,
})
}
fn local_validate(record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
if !record.schema_id.is_empty() && record.data.is_null() {
return Err(DataSinkError::ValidationFailed(
"data must not be null when schema_id is set".to_string(),
));
}
if let Some(obj) = record.data.as_object()
&& obj.is_empty()
{
return Err(DataSinkError::ValidationFailed(
"data object must not be empty".to_string(),
));
}
Ok(())
}
}
#[async_trait]
impl DataSinkPort for ScrapeExchangeAdapter {
async fn publish(
&self,
record: &SinkRecord,
) -> std::result::Result<SinkReceipt, DataSinkError> {
Self::local_validate(record)?;
let payload = Self::map_record(record);
let result = self.client.upload(payload).await.map_err(|e| match e {
ScrapeExchangeError::RateLimited { retry_after_secs } => {
DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
}
ScrapeExchangeError::AuthFailed(msg) => DataSinkError::Unauthorized(msg),
other => DataSinkError::PublishFailed(other.to_string()),
})?;
let id = result
.get("id")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
let published_at = result
.get("created_at")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
Ok(SinkReceipt {
id,
published_at,
platform: "scrape-exchange".to_string(),
})
}
async fn validate(&self, record: &SinkRecord) -> std::result::Result<(), DataSinkError> {
Self::local_validate(record)
}
async fn health_check(&self) -> std::result::Result<(), DataSinkError> {
self.client
.health_check()
.await
.map_err(|e| DataSinkError::PublishFailed(format!("health check failed: {e}")))
}
}
#[async_trait]
impl ScrapingService for ScrapeExchangeAdapter {
async fn execute(&self, input: ServiceInput) -> DomainResult<ServiceOutput> {
debug!("ScrapeExchangeAdapter::execute url={}", input.url);
let result = self
.client
.item_lookup(&input.url)
.await
.map_err(|e| StygianError::from(ServiceError::Unavailable(e.to_string())))?;
Ok(ServiceOutput {
data: result.to_string(),
metadata: json!({ "platform": "scrape-exchange", "url": input.url }),
})
}
fn name(&self) -> &'static str {
"scrape-exchange"
}
}
use futures::stream::StreamExt;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use crate::ports::stream_source::{StreamEvent, StreamSourcePort};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FeedFilter {
pub platform: Option<String>,
pub entity: Option<String>,
pub uploader: Option<String>,
pub creator_id: Option<String>,
pub schema_owner: Option<String>,
pub schema_version: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FeedMessageType {
#[default]
Full,
UploadMetadata,
PlatformMetadata,
}
#[derive(Debug, Clone)]
pub struct FeedConfig {
pub filter: FeedFilter,
pub message_type: FeedMessageType,
pub max_reconnect_attempts: u32,
pub initial_backoff_ms: u64,
}
impl Default for FeedConfig {
fn default() -> Self {
Self {
filter: FeedFilter::default(),
message_type: FeedMessageType::Full,
max_reconnect_attempts: 5,
initial_backoff_ms: 500,
}
}
}
pub struct ScrapeExchangeFeed {
config: FeedConfig,
bearer_token: Option<String>,
}
impl ScrapeExchangeFeed {
#[must_use]
pub const fn new(config: FeedConfig) -> Self {
Self {
config,
bearer_token: None,
}
}
#[must_use]
pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
self.bearer_token = Some(token.into());
self
}
fn backoff_duration(&self, attempt: u32) -> Duration {
let ms = self
.config
.initial_backoff_ms
.saturating_mul(1_u64 << attempt);
Duration::from_millis(ms.min(30_000))
}
fn build_request(
&self,
url: &str,
) -> std::result::Result<
tokio_tungstenite::tungstenite::handshake::client::Request,
ScrapeExchangeError,
> {
let mut req = url
.into_client_request()
.map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid WS URL: {e}")))?;
if let Some(token) = &self.bearer_token {
let value = format!("Bearer {token}")
.parse()
.map_err(|e| ScrapeExchangeError::InvalidConfig(format!("invalid token: {e}")))?;
req.headers_mut().insert("Authorization", value);
}
Ok(req)
}
fn passes_client_filter(&self, event: &StreamEvent) -> bool {
let filter = &self.config.filter;
if filter.schema_owner.is_none() && filter.schema_version.is_none() {
return true;
}
let Ok(val) = serde_json::from_str::<Value>(&event.data) else {
return true; };
if let Some(owner) = &filter.schema_owner
&& val.get("schema_owner").and_then(Value::as_str) != Some(owner.as_str())
{
return false;
}
if let Some(version) = &filter.schema_version
&& val.get("schema_version").and_then(Value::as_str) != Some(version.as_str())
{
return false;
}
true
}
async fn subscribe_once(
&self,
url: &str,
max_events: Option<usize>,
) -> std::result::Result<Vec<StreamEvent>, ScrapeExchangeError> {
let req = self.build_request(url)?;
let (ws_stream, _) = connect_async(req)
.await
.map_err(|e| ScrapeExchangeError::InvalidConfig(format!("WS connect failed: {e}")))?;
let (_, mut read) = ws_stream.split();
let mut events = Vec::new();
let deadline = Duration::from_secs(120);
loop {
let cap = max_events.unwrap_or(usize::MAX);
if events.len() >= cap {
break;
}
let msg = match timeout(deadline, read.next()).await {
Ok(Some(Ok(m))) => m,
Ok(Some(Err(e))) => {
warn!("WS message error: {e}");
break;
}
Ok(None) | Err(_) => break, };
let text = match msg {
Message::Text(t) => t.to_string(),
Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
Message::Close(_) => break,
Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
};
let event = StreamEvent {
id: None,
event_type: Some("upload".to_string()),
data: text,
};
if self.passes_client_filter(&event) {
events.push(event);
}
}
Ok(events)
}
}
#[async_trait]
impl StreamSourcePort for ScrapeExchangeFeed {
async fn subscribe(
&self,
url: &str,
max_events: Option<usize>,
) -> crate::domain::error::Result<Vec<StreamEvent>> {
let mut attempt = 0u32;
loop {
match self.subscribe_once(url, max_events).await {
Ok(events) => return Ok(events),
Err(e) => {
if attempt >= self.config.max_reconnect_attempts {
return Err(StygianError::from(ServiceError::Unavailable(format!(
"WS feed failed after {} attempts: {e}",
attempt + 1
))));
}
let delay = self.backoff_duration(attempt);
warn!(
"WS feed attempt {}/{} failed ({e}), retrying in {}ms",
attempt + 1,
self.config.max_reconnect_attempts,
delay.as_millis()
);
tokio::time::sleep(delay).await;
attempt += 1;
}
}
}
}
fn source_name(&self) -> &'static str {
"scrape-exchange-feed"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_jwt_token_expiry() {
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let token = JwtToken {
access_token: "test".to_string(),
token_type: "Bearer".to_string(),
expires_in: 3600,
issued_at_secs: now_secs,
};
assert!(!token.is_expired());
}
#[test]
fn test_jwt_token_parsing() -> std::result::Result<(), Box<dyn std::error::Error>> {
let json_str = r#"{"access_token":"test_jwt","token_type":"Bearer","expires_in":3600}"#;
let response: JwtTokenResponse = serde_json::from_str(json_str)?;
assert_eq!(response.access_token, "test_jwt");
assert_eq!(response.token_type, "Bearer");
assert_eq!(response.expires_in, 3600);
Ok(())
}
#[test]
fn test_scrape_exchange_config_construction() {
let config = ScrapeExchangeConfig {
api_key_id: "test_id".to_string(),
api_key_secret: "test_secret".to_string(),
base_url: "https://test.api/".to_string(),
};
assert_eq!(config.api_key_id, "test_id");
assert_eq!(config.api_key_secret, "test_secret");
assert_eq!(config.base_url, "https://test.api/");
}
#[test]
fn test_scrape_exchange_error_display() {
let err = ScrapeExchangeError::InvalidConfig("test".to_string());
assert_eq!(err.to_string(), "Invalid configuration: test");
let err = ScrapeExchangeError::RateLimited {
retry_after_secs: 30,
};
assert_eq!(err.to_string(), "Rate limited; retry after 30s");
let err = ScrapeExchangeError::ApiError {
status: 500,
message: "Internal error".to_string(),
};
assert_eq!(err.to_string(), "API error: 500 Internal error");
}
#[test]
fn test_validate_rejects_null_data_with_schema()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let record = SinkRecord::new("product-v1", "https://example.com", Value::Null);
let result = ScrapeExchangeAdapter::local_validate(&record);
let msg = match result.err() {
Some(err) => err.to_string(),
None => {
return Err(std::io::Error::other("null data with schema_id should fail").into());
}
};
assert!(msg.contains("null"), "error should mention null: {msg}");
Ok(())
}
#[test]
fn test_validate_rejects_empty_object() -> std::result::Result<(), Box<dyn std::error::Error>> {
let record = SinkRecord::new(
"product-v1",
"https://example.com",
serde_json::Value::Object(serde_json::Map::new()),
);
let result = ScrapeExchangeAdapter::local_validate(&record);
let msg = match result.err() {
Some(err) => err.to_string(),
None => return Err(std::io::Error::other("empty object should fail validation").into()),
};
assert!(msg.contains("empty"), "error should mention empty: {msg}");
Ok(())
}
#[test]
fn test_validate_accepts_valid_record() {
let record = SinkRecord::new(
"product-v1",
"https://example.com",
serde_json::json!({ "sku": "ABC-42" }),
);
let result = ScrapeExchangeAdapter::local_validate(&record);
assert!(result.is_ok(), "valid record should pass: {result:?}");
}
#[test]
fn test_map_record_produces_correct_fields() {
let record = SinkRecord::new(
"order-v2",
"https://shop.example.com/orders/99",
serde_json::json!({ "total": 39.99 }),
);
let mapped = ScrapeExchangeAdapter::map_record(&record);
assert_eq!(
mapped.get("schema_id"),
Some(&serde_json::json!("order-v2"))
);
assert_eq!(
mapped.get("source"),
Some(&serde_json::json!("https://shop.example.com/orders/99"))
);
let total = mapped
.get("content")
.and_then(serde_json::Value::as_object)
.and_then(|content| content.get("total"))
.and_then(serde_json::Value::as_f64);
assert_eq!(total, Some(39.99));
}
#[test]
fn test_rate_limit_error_mapping() {
let se_err = ScrapeExchangeError::RateLimited {
retry_after_secs: 60,
};
let mapped: DataSinkError = match se_err {
ScrapeExchangeError::RateLimited { retry_after_secs } => {
DataSinkError::RateLimited(format!("retry after {retry_after_secs}s"))
}
other => DataSinkError::PublishFailed(other.to_string()),
};
assert!(
mapped.to_string().contains("60"),
"should mention 60s: {mapped}"
);
}
#[test]
fn test_feed_filter_serialization() -> std::result::Result<(), Box<dyn std::error::Error>> {
let filter = FeedFilter {
platform: Some("web".to_string()),
entity: Some("products".to_string()),
uploader: Some("alice".to_string()),
creator_id: None,
schema_owner: None,
schema_version: None,
};
let json = serde_json::to_string(&filter)?;
assert!(json.contains("\"platform\":\"web\""), "platform: {json}");
assert!(json.contains("\"entity\":\"products\""), "entity: {json}");
assert!(json.contains("\"uploader\":\"alice\""), "uploader: {json}");
Ok(())
}
#[test]
fn test_feed_backoff_timing() {
let config = FeedConfig {
initial_backoff_ms: 100,
max_reconnect_attempts: 5,
..Default::default()
};
let feed = ScrapeExchangeFeed::new(config);
assert_eq!(feed.backoff_duration(0), Duration::from_millis(100));
assert_eq!(feed.backoff_duration(1), Duration::from_millis(200));
assert_eq!(feed.backoff_duration(2), Duration::from_millis(400));
assert_eq!(feed.backoff_duration(20), Duration::from_millis(30_000));
}
#[test]
fn test_client_filter_schema_owner() {
let config = FeedConfig {
filter: FeedFilter {
schema_owner: Some("alice".to_string()),
..Default::default()
},
..Default::default()
};
let feed = ScrapeExchangeFeed::new(config);
let matching = StreamEvent {
id: None,
event_type: Some("upload".to_string()),
data: r#"{"schema_owner":"alice","v":1}"#.to_string(),
};
let non_matching = StreamEvent {
id: None,
event_type: Some("upload".to_string()),
data: r#"{"schema_owner":"bob","v":1}"#.to_string(),
};
assert!(feed.passes_client_filter(&matching), "alice should pass");
assert!(!feed.passes_client_filter(&non_matching), "bob should fail");
}
#[tokio::test]
#[ignore = "requires live Scrape Exchange WebSocket endpoint"]
async fn test_live_feed_connect() -> std::result::Result<(), Box<dyn std::error::Error>> {
let feed = ScrapeExchangeFeed::new(FeedConfig::default());
let events = feed
.subscribe("wss://scrape.exchange/api/messages/v1", Some(1))
.await?;
assert!(!events.is_empty(), "expected at least one event");
Ok(())
}
}