stygian_graph/ports/data_sink.rs
1//! DataSink port — outbound counterpart to [`DataSourcePort`](crate::ports::data_source::DataSourcePort).
2//!
3//! [`DataSinkPort`](crate::ports::data_sink::DataSinkPort) is the abstraction that lets pipeline nodes publish scraped
4//! records to an external system without being coupled to any particular backend
5//! (file system, webhook endpoint, message queue, Scrape Exchange, etc.).
6//!
7//! # Architecture
8//!
9//! Following the hexagonal architecture model:
10//!
11//! - This file lives in the **ports** layer — pure trait definitions, no I/O.
12//! - Concrete adapters (file sink, HTTP sink, …) implement this trait and live
13//! under `adapters/`.
14//!
15//! # Example
16//!
17//! ```rust
18//! use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord};
19//!
20//! // Any adapter that implements DataSinkPort can be used here.
21//! async fn publish_one(sink: &dyn DataSinkPort, payload: serde_json::Value) {
22//! let record = SinkRecord::new("my-schema", "https://example.com", payload);
23//! match sink.publish(&record).await {
24//! Ok(receipt) => println!("Published: {}", receipt.id),
25//! Err(e) => eprintln!("Publish failed: {e}"),
26//! }
27//! }
28//! ```
29
30use std::collections::HashMap;
31
32use async_trait::async_trait;
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35
36// ── Error type ────────────────────────────────────────────────────────────────
37
38/// Errors that a [`DataSinkPort`] implementation may return.
39#[derive(Debug, Error)]
40#[non_exhaustive]
41pub enum DataSinkError {
42 /// The record failed structural or semantic validation before being sent.
43 #[error("validation failed: {0}")]
44 ValidationFailed(String),
45
46 /// The underlying transport or API rejected the publish request.
47 #[error("publish failed: {0}")]
48 PublishFailed(String),
49
50 /// The sink is temporarily rate-limited; caller should back off.
51 #[error("rate limited: {0}")]
52 RateLimited(String),
53
54 /// Authentication or authorisation rejected the request.
55 #[error("unauthorized: {0}")]
56 Unauthorized(String),
57
58 /// The referenced schema identifier is not known to this sink.
59 #[error("schema not found: {0}")]
60 SchemaNotFound(String),
61}
62
63// ── Domain types ──────────────────────────────────────────────────────────────
64
65/// A single structured record to be published through a [`DataSinkPort`].
66///
67/// # Example
68///
69/// ```rust
70/// use stygian_graph::ports::data_sink::SinkRecord;
71/// use serde_json::json;
72///
73/// let record = SinkRecord::new(
74/// "product-v1",
75/// "https://shop.example.com/items/42",
76/// json!({ "sku": "ABC-42", "price": 9.99 }),
77/// );
78/// assert_eq!(record.schema_id, "product-v1");
79/// ```
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct SinkRecord {
82 /// The payload to publish. Any JSON value is accepted.
83 pub data: serde_json::Value,
84
85 /// Identifies the schema or data-contract version this record conforms to.
86 /// Sinks may use this for routing, validation, or schema-registry lookups.
87 pub schema_id: String,
88
89 /// The canonical URL the record was scraped from. Used for provenance and
90 /// deduplication. Stored as a `String` to avoid a `url` crate dependency
91 /// in the port layer.
92 pub source_url: String,
93
94 /// Arbitrary string key-value metadata (content-type, run-id, tenant, …).
95 pub metadata: HashMap<String, String>,
96}
97
98impl SinkRecord {
99 /// Construct a new [`SinkRecord`] with empty metadata.
100 ///
101 /// # Example
102 ///
103 /// ```rust
104 /// use stygian_graph::ports::data_sink::SinkRecord;
105 ///
106 /// let r = SinkRecord::new("schema-v1", "https://example.com/page", serde_json::Value::Null);
107 /// assert!(r.metadata.is_empty());
108 /// ```
109 pub fn new(
110 schema_id: impl Into<String>,
111 source_url: impl Into<String>,
112 data: serde_json::Value,
113 ) -> Self {
114 Self {
115 data,
116 schema_id: schema_id.into(),
117 source_url: source_url.into(),
118 metadata: HashMap::new(),
119 }
120 }
121
122 /// Attach a metadata entry and return `self` for builder-style use.
123 ///
124 /// # Example
125 ///
126 /// ```rust
127 /// use stygian_graph::ports::data_sink::SinkRecord;
128 ///
129 /// let r = SinkRecord::new("s", "https://x.com", serde_json::Value::Null)
130 /// .with_meta("run_id", "abc123");
131 /// assert_eq!(r.metadata["run_id"], "abc123");
132 /// ```
133 #[must_use]
134 pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135 self.metadata.insert(key.into(), value.into());
136 self
137 }
138}
139
140/// Confirmation that a [`SinkRecord`] was successfully accepted by the sink.
141///
142/// # Example
143///
144/// ```rust
145/// use stygian_graph::ports::data_sink::SinkReceipt;
146///
147/// let receipt = SinkReceipt {
148/// id: "rec-001".to_string(),
149/// published_at: "2026-04-09T00:00:00Z".to_string(),
150/// platform: "file-sink".to_string(),
151/// };
152/// assert_eq!(receipt.platform, "file-sink");
153/// ```
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct SinkReceipt {
156 /// Platform-assigned identifier for this published record.
157 pub id: String,
158
159 /// ISO 8601 timestamp at which the sink accepted the record.
160 pub published_at: String,
161
162 /// Human-readable name of the sink platform (e.g. `"scrape-exchange"`, `"file"`).
163 pub platform: String,
164}
165
166// ── Port trait ────────────────────────────────────────────────────────────────
167
168/// Outbound data sink port — publish scraped records to an external system.
169///
170/// Implementations live in `adapters/` and are never imported by domain code.
171/// The port is always injected via `Arc<dyn DataSinkPort>`.
172///
173/// # Object safety
174///
175/// Native `async fn` in traits is not object-safe by itself. This trait uses
176/// `#[async_trait]`, which erases async methods into boxed futures and enables
177/// usage as `dyn DataSinkPort` through `Arc` in this workspace.
178///
179/// # Example
180///
181/// ```rust
182/// use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord, SinkReceipt, DataSinkError};
183///
184/// struct NoopSink;
185///
186/// #[async_trait::async_trait]
187/// impl DataSinkPort for NoopSink {
188/// async fn publish(&self, _record: &SinkRecord) -> Result<SinkReceipt, DataSinkError> {
189/// Ok(SinkReceipt {
190/// id: "noop".to_string(),
191/// published_at: "".to_string(),
192/// platform: "noop".to_string(),
193/// })
194/// }
195///
196/// async fn validate(&self, _record: &SinkRecord) -> Result<(), DataSinkError> {
197/// Ok(())
198/// }
199///
200/// async fn health_check(&self) -> Result<(), DataSinkError> {
201/// Ok(())
202/// }
203/// }
204/// ```
205#[async_trait]
206pub trait DataSinkPort: Send + Sync {
207 /// Validate and publish `record` to the sink.
208 ///
209 /// Implementations should validate the record before publishing; failing
210 /// fast with [`DataSinkError::ValidationFailed`] is preferred over sending
211 /// invalid data downstream.
212 ///
213 /// # Errors
214 ///
215 /// Returns [`DataSinkError`] on validation failure, transport error, or
216 /// rate-limit/auth rejection.
217 async fn publish(&self, record: &SinkRecord) -> Result<SinkReceipt, DataSinkError>;
218
219 /// Validate `record` without publishing it.
220 ///
221 /// Useful for preflight checks without side effects.
222 ///
223 /// # Errors
224 ///
225 /// Returns [`DataSinkError::ValidationFailed`] if the record is malformed
226 /// or violates schema constraints.
227 async fn validate(&self, record: &SinkRecord) -> Result<(), DataSinkError>;
228
229 /// Check that the sink backend is reachable and healthy.
230 ///
231 /// # Errors
232 ///
233 /// Returns [`DataSinkError::PublishFailed`] or [`DataSinkError::Unauthorized`]
234 /// if the backend is unreachable or misconfigured.
235 async fn health_check(&self) -> Result<(), DataSinkError>;
236}
237
238// ── Tests ─────────────────────────────────────────────────────────────────────
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use serde_json::{Value, json};
244
245 #[test]
246 fn sink_record_construction_and_serde_roundtrip()
247 -> std::result::Result<(), Box<dyn std::error::Error>> {
248 let record = SinkRecord::new(
249 "product-v1",
250 "https://shop.example.com/items/42",
251 json!({ "sku": "ABC-42", "price": 9.99 }),
252 )
253 .with_meta("run_id", "abc123")
254 .with_meta("tenant", "acme");
255
256 assert_eq!(record.schema_id, "product-v1");
257 assert_eq!(record.source_url, "https://shop.example.com/items/42");
258 assert_eq!(
259 record.data.get("sku").and_then(Value::as_str),
260 Some("ABC-42")
261 );
262 assert_eq!(
263 record.metadata.get("run_id").map(String::as_str),
264 Some("abc123")
265 );
266 assert_eq!(
267 record.metadata.get("tenant").map(String::as_str),
268 Some("acme")
269 );
270
271 // Round-trip through JSON
272 let json_str = serde_json::to_string(&record)?;
273 let restored: SinkRecord = serde_json::from_str(&json_str)?;
274
275 assert_eq!(restored.schema_id, record.schema_id);
276 assert_eq!(restored.source_url, record.source_url);
277 assert_eq!(
278 restored.metadata.get("run_id").map(String::as_str),
279 Some("abc123")
280 );
281 Ok(())
282 }
283
284 #[test]
285 fn sink_receipt_serde_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
286 let receipt = SinkReceipt {
287 id: "rec-001".to_string(),
288 published_at: "2026-04-09T00:00:00Z".to_string(),
289 platform: "test-sink".to_string(),
290 };
291
292 let json_str = serde_json::to_string(&receipt)?;
293 let restored: SinkReceipt = serde_json::from_str(&json_str)?;
294
295 assert_eq!(restored.id, receipt.id);
296 assert_eq!(restored.platform, receipt.platform);
297 Ok(())
298 }
299
300 #[test]
301 fn data_sink_error_display() {
302 assert_eq!(
303 DataSinkError::ValidationFailed("missing field".to_string()).to_string(),
304 "validation failed: missing field"
305 );
306 assert_eq!(
307 DataSinkError::PublishFailed("timeout".to_string()).to_string(),
308 "publish failed: timeout"
309 );
310 assert_eq!(
311 DataSinkError::RateLimited("429".to_string()).to_string(),
312 "rate limited: 429"
313 );
314 assert_eq!(
315 DataSinkError::Unauthorized("401".to_string()).to_string(),
316 "unauthorized: 401"
317 );
318 assert_eq!(
319 DataSinkError::SchemaNotFound("v99".to_string()).to_string(),
320 "schema not found: v99"
321 );
322 }
323}