Skip to main content

stygian_graph/ports/
data_sink.rs

1//! DataSink port — outbound counterpart to [`DataSourcePort`](crate::ports::data_source::DataSourcePort).
2//!
3//! [`DataSinkPort`](crate::ports::data_sink::DataSinkPort) is the abstraction that lets pipeline nodes publish scraped
4//! records to an external system without being coupled to any particular backend
5//! (file system, webhook endpoint, message queue, Scrape Exchange, etc.).
6//!
7//! # Architecture
8//!
9//! Following the hexagonal architecture model:
10//!
11//! - This file lives in the **ports** layer — pure trait definitions, no I/O.
12//! - Concrete adapters (file sink, HTTP sink, …) implement this trait and live
13//!   under `adapters/`.
14//!
15//! # Example
16//!
17//! ```rust
18//! use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord};
19//!
20//! // Any adapter that implements DataSinkPort can be used here.
21//! async fn publish_one(sink: &dyn DataSinkPort, payload: serde_json::Value) {
22//!     let record = SinkRecord::new("my-schema", "https://example.com", payload);
23//!     match sink.publish(&record).await {
24//!         Ok(receipt) => println!("Published: {}", receipt.id),
25//!         Err(e) => eprintln!("Publish failed: {e}"),
26//!     }
27//! }
28//! ```
29
30use std::collections::HashMap;
31
32use async_trait::async_trait;
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35
36// ── Error type ────────────────────────────────────────────────────────────────
37
38/// Errors that a [`DataSinkPort`] implementation may return.
39#[derive(Debug, Error)]
40#[non_exhaustive]
41pub enum DataSinkError {
42    /// The record failed structural or semantic validation before being sent.
43    #[error("validation failed: {0}")]
44    ValidationFailed(String),
45
46    /// The underlying transport or API rejected the publish request.
47    #[error("publish failed: {0}")]
48    PublishFailed(String),
49
50    /// The sink is temporarily rate-limited; caller should back off.
51    #[error("rate limited: {0}")]
52    RateLimited(String),
53
54    /// Authentication or authorisation rejected the request.
55    #[error("unauthorized: {0}")]
56    Unauthorized(String),
57
58    /// The referenced schema identifier is not known to this sink.
59    #[error("schema not found: {0}")]
60    SchemaNotFound(String),
61}
62
63// ── Domain types ──────────────────────────────────────────────────────────────
64
65/// A single structured record to be published through a [`DataSinkPort`].
66///
67/// # Example
68///
69/// ```rust
70/// use stygian_graph::ports::data_sink::SinkRecord;
71/// use serde_json::json;
72///
73/// let record = SinkRecord::new(
74///     "product-v1",
75///     "https://shop.example.com/items/42",
76///     json!({ "sku": "ABC-42", "price": 9.99 }),
77/// );
78/// assert_eq!(record.schema_id, "product-v1");
79/// ```
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct SinkRecord {
82    /// The payload to publish. Any JSON value is accepted.
83    pub data: serde_json::Value,
84
85    /// Identifies the schema or data-contract version this record conforms to.
86    /// Sinks may use this for routing, validation, or schema-registry lookups.
87    pub schema_id: String,
88
89    /// The canonical URL the record was scraped from. Used for provenance and
90    /// deduplication. Stored as a `String` to avoid a `url` crate dependency
91    /// in the port layer.
92    pub source_url: String,
93
94    /// Arbitrary string key-value metadata (content-type, run-id, tenant, …).
95    pub metadata: HashMap<String, String>,
96}
97
98impl SinkRecord {
99    /// Construct a new [`SinkRecord`] with empty metadata.
100    ///
101    /// # Example
102    ///
103    /// ```rust
104    /// use stygian_graph::ports::data_sink::SinkRecord;
105    ///
106    /// let r = SinkRecord::new("schema-v1", "https://example.com/page", serde_json::Value::Null);
107    /// assert!(r.metadata.is_empty());
108    /// ```
109    pub fn new(
110        schema_id: impl Into<String>,
111        source_url: impl Into<String>,
112        data: serde_json::Value,
113    ) -> Self {
114        Self {
115            data,
116            schema_id: schema_id.into(),
117            source_url: source_url.into(),
118            metadata: HashMap::new(),
119        }
120    }
121
122    /// Attach a metadata entry and return `self` for builder-style use.
123    ///
124    /// # Example
125    ///
126    /// ```rust
127    /// use stygian_graph::ports::data_sink::SinkRecord;
128    ///
129    /// let r = SinkRecord::new("s", "https://x.com", serde_json::Value::Null)
130    ///     .with_meta("run_id", "abc123");
131    /// assert_eq!(r.metadata["run_id"], "abc123");
132    /// ```
133    #[must_use]
134    pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
135        self.metadata.insert(key.into(), value.into());
136        self
137    }
138}
139
140/// Confirmation that a [`SinkRecord`] was successfully accepted by the sink.
141///
142/// # Example
143///
144/// ```rust
145/// use stygian_graph::ports::data_sink::SinkReceipt;
146///
147/// let receipt = SinkReceipt {
148///     id: "rec-001".to_string(),
149///     published_at: "2026-04-09T00:00:00Z".to_string(),
150///     platform: "file-sink".to_string(),
151/// };
152/// assert_eq!(receipt.platform, "file-sink");
153/// ```
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct SinkReceipt {
156    /// Platform-assigned identifier for this published record.
157    pub id: String,
158
159    /// ISO 8601 timestamp at which the sink accepted the record.
160    pub published_at: String,
161
162    /// Human-readable name of the sink platform (e.g. `"scrape-exchange"`, `"file"`).
163    pub platform: String,
164}
165
166// ── Port trait ────────────────────────────────────────────────────────────────
167
168/// Outbound data sink port — publish scraped records to an external system.
169///
170/// Implementations live in `adapters/` and are never imported by domain code.
171/// The port is always injected via `Arc<dyn DataSinkPort>`.
172///
173/// # Object safety
174///
175/// Native `async fn` in traits is not object-safe by itself. This trait uses
176/// `#[async_trait]`, which erases async methods into boxed futures and enables
177/// usage as `dyn DataSinkPort` through `Arc` in this workspace.
178///
179/// # Example
180///
181/// ```rust
182/// use stygian_graph::ports::data_sink::{DataSinkPort, SinkRecord, SinkReceipt, DataSinkError};
183///
184/// struct NoopSink;
185///
186/// #[async_trait::async_trait]
187/// impl DataSinkPort for NoopSink {
188///     async fn publish(&self, _record: &SinkRecord) -> Result<SinkReceipt, DataSinkError> {
189///         Ok(SinkReceipt {
190///             id: "noop".to_string(),
191///             published_at: "".to_string(),
192///             platform: "noop".to_string(),
193///         })
194///     }
195///
196///     async fn validate(&self, _record: &SinkRecord) -> Result<(), DataSinkError> {
197///         Ok(())
198///     }
199///
200///     async fn health_check(&self) -> Result<(), DataSinkError> {
201///         Ok(())
202///     }
203/// }
204/// ```
205#[async_trait]
206pub trait DataSinkPort: Send + Sync {
207    /// Validate and publish `record` to the sink.
208    ///
209    /// Implementations should validate the record before publishing; failing
210    /// fast with [`DataSinkError::ValidationFailed`] is preferred over sending
211    /// invalid data downstream.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`DataSinkError`] on validation failure, transport error, or
216    /// rate-limit/auth rejection.
217    async fn publish(&self, record: &SinkRecord) -> Result<SinkReceipt, DataSinkError>;
218
219    /// Validate `record` without publishing it.
220    ///
221    /// Useful for preflight checks without side effects.
222    ///
223    /// # Errors
224    ///
225    /// Returns [`DataSinkError::ValidationFailed`] if the record is malformed
226    /// or violates schema constraints.
227    async fn validate(&self, record: &SinkRecord) -> Result<(), DataSinkError>;
228
229    /// Check that the sink backend is reachable and healthy.
230    ///
231    /// # Errors
232    ///
233    /// Returns [`DataSinkError::PublishFailed`] or [`DataSinkError::Unauthorized`]
234    /// if the backend is unreachable or misconfigured.
235    async fn health_check(&self) -> Result<(), DataSinkError>;
236}
237
238// ── Tests ─────────────────────────────────────────────────────────────────────
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use serde_json::{Value, json};
244
245    #[test]
246    fn sink_record_construction_and_serde_roundtrip()
247    -> std::result::Result<(), Box<dyn std::error::Error>> {
248        let record = SinkRecord::new(
249            "product-v1",
250            "https://shop.example.com/items/42",
251            json!({ "sku": "ABC-42", "price": 9.99 }),
252        )
253        .with_meta("run_id", "abc123")
254        .with_meta("tenant", "acme");
255
256        assert_eq!(record.schema_id, "product-v1");
257        assert_eq!(record.source_url, "https://shop.example.com/items/42");
258        assert_eq!(
259            record.data.get("sku").and_then(Value::as_str),
260            Some("ABC-42")
261        );
262        assert_eq!(
263            record.metadata.get("run_id").map(String::as_str),
264            Some("abc123")
265        );
266        assert_eq!(
267            record.metadata.get("tenant").map(String::as_str),
268            Some("acme")
269        );
270
271        // Round-trip through JSON
272        let json_str = serde_json::to_string(&record)?;
273        let restored: SinkRecord = serde_json::from_str(&json_str)?;
274
275        assert_eq!(restored.schema_id, record.schema_id);
276        assert_eq!(restored.source_url, record.source_url);
277        assert_eq!(
278            restored.metadata.get("run_id").map(String::as_str),
279            Some("abc123")
280        );
281        Ok(())
282    }
283
284    #[test]
285    fn sink_receipt_serde_roundtrip() -> std::result::Result<(), Box<dyn std::error::Error>> {
286        let receipt = SinkReceipt {
287            id: "rec-001".to_string(),
288            published_at: "2026-04-09T00:00:00Z".to_string(),
289            platform: "test-sink".to_string(),
290        };
291
292        let json_str = serde_json::to_string(&receipt)?;
293        let restored: SinkReceipt = serde_json::from_str(&json_str)?;
294
295        assert_eq!(restored.id, receipt.id);
296        assert_eq!(restored.platform, receipt.platform);
297        Ok(())
298    }
299
300    #[test]
301    fn data_sink_error_display() {
302        assert_eq!(
303            DataSinkError::ValidationFailed("missing field".to_string()).to_string(),
304            "validation failed: missing field"
305        );
306        assert_eq!(
307            DataSinkError::PublishFailed("timeout".to_string()).to_string(),
308            "publish failed: timeout"
309        );
310        assert_eq!(
311            DataSinkError::RateLimited("429".to_string()).to_string(),
312            "rate limited: 429"
313        );
314        assert_eq!(
315            DataSinkError::Unauthorized("401".to_string()).to_string(),
316            "unauthorized: 401"
317        );
318        assert_eq!(
319            DataSinkError::SchemaNotFound("v99".to_string()).to_string(),
320            "schema not found: v99"
321        );
322    }
323}