use std::collections::HashMap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DataSinkError {
#[error("validation failed: {0}")]
ValidationFailed(String),
#[error("publish failed: {0}")]
PublishFailed(String),
#[error("rate limited: {0}")]
RateLimited(String),
#[error("unauthorized: {0}")]
Unauthorized(String),
#[error("schema not found: {0}")]
SchemaNotFound(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SinkRecord {
pub data: serde_json::Value,
pub schema_id: String,
pub source_url: String,
pub metadata: HashMap<String, String>,
}
impl SinkRecord {
pub fn new(
schema_id: impl Into<String>,
source_url: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
data,
schema_id: schema_id.into(),
source_url: source_url.into(),
metadata: HashMap::new(),
}
}
#[must_use]
pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SinkReceipt {
pub id: String,
pub published_at: String,
pub platform: String,
}
#[async_trait]
pub trait DataSinkPort: Send + Sync {
async fn publish(&self, record: &SinkRecord) -> Result<SinkReceipt, DataSinkError>;
async fn validate(&self, record: &SinkRecord) -> Result<(), DataSinkError>;
async fn health_check(&self) -> Result<(), DataSinkError>;
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{Value, json};
#[test]
fn sink_record_construction_and_serde_roundtrip()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let record = SinkRecord::new(
"product-v1",
"https://shop.example.com/items/42",
json!({ "sku": "ABC-42", "price": 9.99 }),
)
.with_meta("run_id", "abc123")
.with_meta("tenant", "acme");
assert_eq!(record.schema_id, "product-v1");
assert_eq!(record.source_url, "https://shop.example.com/items/42");
assert_eq!(
record.data.get("sku").and_then(Value::as_str),
Some("ABC-42")
);
assert_eq!(
record.metadata.get("run_id").map(String::as_str),
Some("abc123")
);
assert_eq!(
record.metadata.get("tenant").map(String::as_str),
Some("acme")
);
let json_str = serde_json::to_string(&record)?;
let restored: SinkRecord = serde_json::from_str(&json_str)?;
assert_eq!(restored.schema_id, record.schema_id);
assert_eq!(restored.source_url, record.source_url);
assert_eq!(
restored.metadata.get("run_id").map(String::as_str),
Some("abc123")
);
Ok(())
}
#[test]
fn sink_receipt_serde_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
let receipt = SinkReceipt {
id: "rec-001".to_string(),
published_at: "2026-04-09T00:00:00Z".to_string(),
platform: "test-sink".to_string(),
};
let json_str = serde_json::to_string(&receipt)?;
let restored: SinkReceipt = serde_json::from_str(&json_str)?;
assert_eq!(restored.id, receipt.id);
assert_eq!(restored.platform, receipt.platform);
Ok(())
}
#[test]
fn data_sink_error_display() {
assert_eq!(
DataSinkError::ValidationFailed("missing field".to_string()).to_string(),
"validation failed: missing field"
);
assert_eq!(
DataSinkError::PublishFailed("timeout".to_string()).to_string(),
"publish failed: timeout"
);
assert_eq!(
DataSinkError::RateLimited("429".to_string()).to_string(),
"rate limited: 429"
);
assert_eq!(
DataSinkError::Unauthorized("401".to_string()).to_string(),
"unauthorized: 401"
);
assert_eq!(
DataSinkError::SchemaNotFound("v99".to_string()).to_string(),
"schema not found: v99"
);
}
}