Skip to main content

stygian_graph/ports/
storage.rs

1//! Storage port — persist and retrieve pipeline results.
2//!
3//! Defines the generic [`StoragePort`](storage::StoragePort) trait plus the [`OutputFormatter`](storage::OutputFormatter) helper
4//! that serialises pipeline outputs to CSV, JSONL, or JSON.
5//!
6//! # Architecture
7//!
8//! ```text
9//! stygian-graph
10//!   ├─ StoragePort (this file)             ← always compiled
11//!   └─ Adapters (adapters/)
12//!        ├─ FileStorage       (always)     → writes .jsonl to disk
13//!        ├─ NullStorage       (always)     → no-op for tests
14//!        └─ PostgresStorage   (feature="postgres")  → sqlx PgPool
15//! ```
16//!
17//! # Example — writing results
18//!
19//! ```no_run
20//! use stygian_graph::ports::storage::{StoragePort, StorageRecord};
21//! use serde_json::json;
22//!
23//! async fn persist<S: StoragePort>(storage: &S) {
24//!     let record = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
25//!     storage.store(record).await.unwrap();
26//! }
27//! ```
28
29use crate::domain::error::Result;
30use async_trait::async_trait;
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33use std::time::{SystemTime, UNIX_EPOCH};
34
35// ─────────────────────────────────────────────────────────────────────────────
36// StorageRecord
37// ─────────────────────────────────────────────────────────────────────────────
38
39/// A single result record produced by a pipeline node.
40///
41/// # Example
42///
43/// ```
44/// use stygian_graph::ports::storage::StorageRecord;
45/// use serde_json::json;
46///
47/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
48/// assert_eq!(r.pipeline_id, "pipe-1");
49/// assert_eq!(r.node_name,   "fetch");
50/// ```
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct StorageRecord {
53    /// Unique record ID (UUID v4)
54    pub id: String,
55    /// Pipeline this record belongs to
56    pub pipeline_id: String,
57    /// Graph node that produced this record
58    pub node_name: String,
59    /// Extracted data payload
60    pub data: Value,
61    /// Optional key-value metadata (headers, status code, …)
62    #[serde(default)]
63    pub metadata: std::collections::HashMap<String, String>,
64    /// Unix timestamp of when this record was created (milliseconds)
65    pub timestamp_ms: u64,
66}
67
68impl StorageRecord {
69    /// Construct a new record with a fresh UUID and current timestamp.
70    ///
71    /// # Example
72    ///
73    /// ```
74    /// use stygian_graph::ports::storage::StorageRecord;
75    /// use serde_json::json;
76    ///
77    /// let r = StorageRecord::new("p", "n", json!(null));
78    /// assert!(!r.id.is_empty());
79    /// assert!(r.timestamp_ms > 0);
80    /// ```
81    #[must_use]
82    pub fn new(pipeline_id: &str, node_name: &str, data: Value) -> Self {
83        let id = uuid::Uuid::new_v4().to_string();
84        let timestamp_ms = u64::try_from(
85            SystemTime::now()
86                .duration_since(UNIX_EPOCH)
87                .unwrap_or_default()
88                .as_millis(),
89        )
90        .unwrap_or(u64::MAX);
91        Self {
92            id,
93            pipeline_id: pipeline_id.to_string(),
94            node_name: node_name.to_string(),
95            data,
96            metadata: std::collections::HashMap::new(),
97            timestamp_ms,
98        }
99    }
100
101    /// Attach metadata key-value pairs.
102    ///
103    /// # Example
104    ///
105    /// ```
106    /// use stygian_graph::ports::storage::StorageRecord;
107    /// use serde_json::json;
108    ///
109    /// let r = StorageRecord::new("p", "n", json!(null))
110    ///     .with_metadata("status", "200");
111    /// assert_eq!(r.metadata["status"], "200");
112    /// ```
113    #[must_use]
114    pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
115        self.metadata.insert(key.to_string(), value.to_string());
116        self
117    }
118}
119
120// ─────────────────────────────────────────────────────────────────────────────
121// StoragePort
122// ─────────────────────────────────────────────────────────────────────────────
123
124/// Port: persist and retrieve [`StorageRecord`]s produced by pipelines.
125///
126/// # Example
127///
128/// ```no_run
129/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
130/// use serde_json::json;
131///
132/// async fn run<S: StoragePort>(storage: &S) {
133///     let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
134///     storage.store(r.clone()).await.unwrap();
135///
136///     let fetched = storage.retrieve(&r.id).await.unwrap().unwrap();
137///     assert_eq!(fetched.id, r.id);
138/// }
139/// ```
140#[async_trait]
141pub trait StoragePort: Send + Sync {
142    /// Persist a record.
143    ///
144    /// # Example
145    ///
146    /// ```no_run
147    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
148    /// # use serde_json::json;
149    /// # async fn example(s: impl StoragePort) {
150    /// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
151    /// # }
152    /// ```
153    async fn store(&self, record: StorageRecord) -> Result<()>;
154
155    /// Retrieve a record by ID.  Returns `None` if not found.
156    ///
157    /// # Example
158    ///
159    /// ```no_run
160    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
161    /// # use serde_json::json;
162    /// # async fn example(s: impl StoragePort) {
163    /// let maybe = s.retrieve("some-id").await.unwrap();
164    /// # }
165    /// ```
166    async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>>;
167
168    /// List all records for a given `pipeline_id`.
169    ///
170    /// # Example
171    ///
172    /// ```no_run
173    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
174    /// # use serde_json::json;
175    /// # async fn example(s: impl StoragePort) {
176    /// let records = s.list("pipe-1").await.unwrap();
177    /// # }
178    /// ```
179    async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>>;
180
181    /// Delete a record by ID.  No-op if it does not exist.
182    ///
183    /// # Example
184    ///
185    /// ```no_run
186    /// # use stygian_graph::ports::storage::{StoragePort, StorageRecord};
187    /// # async fn example(s: impl StoragePort) {
188    /// s.delete("some-id").await.unwrap();
189    /// # }
190    /// ```
191    async fn delete(&self, id: &str) -> Result<()>;
192}
193
194// ─────────────────────────────────────────────────────────────────────────────
195// OutputFormat + OutputFormatter
196// ─────────────────────────────────────────────────────────────────────────────
197
198/// Supported serialisation formats for pipeline result export.
199///
200/// # Example
201///
202/// ```
203/// use stygian_graph::ports::storage::OutputFormat;
204///
205/// let fmt = OutputFormat::Jsonl;
206/// assert_eq!(fmt.extension(), "jsonl");
207/// ```
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
209#[serde(rename_all = "lowercase")]
210pub enum OutputFormat {
211    /// Newline-delimited JSON — one record per line
212    #[default]
213    Jsonl,
214    /// CSV — header row + comma-separated values
215    Csv,
216    /// Pretty-printed JSON array
217    Json,
218}
219
220impl OutputFormat {
221    /// File extension for this format.
222    ///
223    /// # Example
224    ///
225    /// ```
226    /// use stygian_graph::ports::storage::OutputFormat;
227    ///
228    /// assert_eq!(OutputFormat::Csv.extension(), "csv");
229    /// assert_eq!(OutputFormat::Json.extension(), "json");
230    /// assert_eq!(OutputFormat::Jsonl.extension(), "jsonl");
231    /// ```
232    #[must_use]
233    pub const fn extension(self) -> &'static str {
234        match self {
235            Self::Jsonl => "jsonl",
236            Self::Csv => "csv",
237            Self::Json => "json",
238        }
239    }
240}
241
242/// Port: serialise a slice of [`StorageRecord`]s to bytes in a given format.
243///
244/// # Example
245///
246/// ```
247/// use stygian_graph::ports::storage::{OutputFormat, OutputFormatter, StorageRecord};
248/// use stygian_graph::domain::error::Result;
249/// use serde_json::json;
250///
251/// struct JsonlFormatter;
252///
253/// impl OutputFormatter for JsonlFormatter {
254///     fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>> {
255///         let mut out = Vec::new();
256///         for r in records {
257///             let line = serde_json::to_string(r).unwrap();
258///             out.extend_from_slice(line.as_bytes());
259///             out.push(b'\n');
260///         }
261///         Ok(out)
262///     }
263///     fn format_type(&self) -> OutputFormat { OutputFormat::Jsonl }
264/// }
265/// ```
266pub trait OutputFormatter: Send + Sync {
267    /// Serialise `records` to owned bytes.
268    ///
269    /// # Errors
270    ///
271    /// Returns [`crate::domain::error::StygianError`] when serialisation fails
272    /// (for example an unwritable cell in CSV, a value the JSON encoder cannot
273    /// represent, etc.).
274    fn format(&self, records: &[StorageRecord]) -> Result<Vec<u8>>;
275
276    /// Which format this formatter produces.
277    fn format_type(&self) -> OutputFormat;
278}