Skip to main content

stygian_graph/adapters/
storage.rs

1//! Storage adapters — persist and retrieve pipeline [`StorageRecord`](crate::ports::storage::StorageRecord)s.
2//!
3//! # Adapters
4//!
5//! | Adapter | Availability | Backing store |
6//! | --------- | -------------- | --------------- |
7//! | [`NullStorage`](storage::NullStorage) | always | no-op (tests / dry-run) |
8//! | [`FileStorage`](storage::FileStorage) | always | `.jsonl` files on local disk |
9//! | `PostgresStorage` (`storage::postgres::PostgresStorage`) | `feature = "postgres"` | PostgreSQL via sqlx |
10
11use crate::domain::error::{Result, ServiceError, StygianError};
12use crate::ports::storage::{StoragePort, StorageRecord};
13use async_trait::async_trait;
14use std::path::PathBuf;
15use tokio::io::AsyncWriteExt;
16
17// ─────────────────────────────────────────────────────────────────────────────
18// NullStorage
19// ─────────────────────────────────────────────────────────────────────────────
20
21/// No-op storage adapter — discards all records.
22///
23/// Useful for dry-run mode and unit tests where persistence is not required.
24///
25/// # Example
26///
27/// ```
28/// use stygian_graph::adapters::storage::NullStorage;
29/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
30/// use serde_json::json;
31///
32/// # tokio_test::block_on(async {
33/// let s = NullStorage;
34/// s.store(StorageRecord::new("p", "n", json!(null))).await.unwrap();
35/// let result = s.retrieve("any-id").await.unwrap();
36/// assert!(result.is_none());
37/// # });
38/// ```
39pub struct NullStorage;
40
41#[async_trait]
42impl StoragePort for NullStorage {
43    async fn store(&self, _record: StorageRecord) -> Result<()> {
44        Ok(())
45    }
46
47    async fn retrieve(&self, _id: &str) -> Result<Option<StorageRecord>> {
48        Ok(None)
49    }
50
51    async fn list(&self, _pipeline_id: &str) -> Result<Vec<StorageRecord>> {
52        Ok(vec![])
53    }
54
55    async fn delete(&self, _id: &str) -> Result<()> {
56        Ok(())
57    }
58}
59
60// ─────────────────────────────────────────────────────────────────────────────
61// FileStorage
62// ─────────────────────────────────────────────────────────────────────────────
63
64/// File-based storage adapter — one `.jsonl` file per pipeline.
65///
66/// Each pipeline gets its own file: `<dir>/<pipeline_id>.jsonl`.
67/// Records are appended one JSON object per line.
68///
69/// # Example
70///
71/// ```no_run
72/// use stygian_graph::adapters::storage::FileStorage;
73/// use stygian_graph::ports::storage::{StoragePort, StorageRecord};
74/// use serde_json::json;
75/// use std::path::PathBuf;
76///
77/// # tokio_test::block_on(async {
78/// let storage = FileStorage::new(PathBuf::from("/tmp/stygian-results"));
79/// let r = StorageRecord::new("pipe-1", "fetch", json!({"url": "https://example.com"}));
80/// storage.store(r).await.unwrap();
81/// # });
82/// ```
83pub struct FileStorage {
84    dir: PathBuf,
85}
86
87impl FileStorage {
88    /// Create a [`FileStorage`] backed by `dir`.
89    ///
90    /// The directory will be created on first write if it does not exist.
91    ///
92    /// # Example
93    ///
94    /// ```
95    /// use stygian_graph::adapters::storage::FileStorage;
96    /// use std::path::PathBuf;
97    ///
98    /// let s = FileStorage::new(PathBuf::from("/tmp/data"));
99    /// ```
100    #[must_use]
101    pub const fn new(dir: PathBuf) -> Self {
102        Self { dir }
103    }
104
105    fn pipeline_file(&self, pipeline_id: &str) -> PathBuf {
106        // Sanitise: replace path separators so callers cannot escape the dir
107        let safe_id = pipeline_id.replace(['/', '\\', '.', ':'], "_");
108        self.dir.join(format!("{safe_id}.jsonl"))
109    }
110}
111
112#[async_trait]
113impl StoragePort for FileStorage {
114    async fn store(&self, record: StorageRecord) -> Result<()> {
115        tokio::fs::create_dir_all(&self.dir).await.map_err(|e| {
116            StygianError::Service(ServiceError::InvalidResponse(format!(
117                "FileStorage: create_dir_all failed: {e}"
118            )))
119        })?;
120
121        let path = self.pipeline_file(&record.pipeline_id);
122        let mut line = serde_json::to_string(&record).map_err(|e| {
123            StygianError::Service(ServiceError::InvalidResponse(format!(
124                "FileStorage: serialise record failed: {e}"
125            )))
126        })?;
127        line.push('\n');
128
129        let mut file = tokio::fs::OpenOptions::new()
130            .create(true)
131            .append(true)
132            .open(&path)
133            .await
134            .map_err(|e| {
135                StygianError::Service(ServiceError::InvalidResponse(format!(
136                    "FileStorage: open {}: {e}",
137                    path.display()
138                )))
139            })?;
140
141        file.write_all(line.as_bytes()).await.map_err(|e| {
142            StygianError::Service(ServiceError::InvalidResponse(format!(
143                "FileStorage: write failed: {e}"
144            )))
145        })?;
146
147        Ok(())
148    }
149
150    async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
151        // Scan all .jsonl files — linear scan is acceptable for moderate volumes
152        let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
153            return Ok(None);
154        };
155
156        while let Ok(Some(entry)) = dir.next_entry().await {
157            let path = entry.path();
158            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
159                continue;
160            }
161            let Ok(content) = tokio::fs::read_to_string(&path).await else {
162                continue;
163            };
164            for line in content.lines() {
165                if let Ok(record) = serde_json::from_str::<StorageRecord>(line)
166                    && record.id == id
167                {
168                    return Ok(Some(record));
169                }
170            }
171        }
172
173        Ok(None)
174    }
175
176    async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
177        let path = self.pipeline_file(pipeline_id);
178        let Ok(content) = tokio::fs::read_to_string(&path).await else {
179            return Ok(vec![]);
180        };
181
182        let records = content
183            .lines()
184            .filter(|l| !l.is_empty())
185            .filter_map(|line| serde_json::from_str::<StorageRecord>(line).ok())
186            .collect();
187
188        Ok(records)
189    }
190
191    async fn delete(&self, id: &str) -> Result<()> {
192        // Read-filter-rewrite strategy (adequate for typical pipeline sizes)
193        let Ok(mut dir) = tokio::fs::read_dir(&self.dir).await else {
194            return Ok(()); // dir does not exist → nothing to delete
195        };
196
197        while let Ok(Some(entry)) = dir.next_entry().await {
198            let path = entry.path();
199            if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
200                continue;
201            }
202            let Ok(content) = tokio::fs::read_to_string(&path).await else {
203                continue;
204            };
205
206            let (kept, found): (Vec<&str>, bool) = {
207                let mut found = false;
208                let kept = content
209                    .lines()
210                    .filter(|line| {
211                        if let Ok(r) = serde_json::from_str::<StorageRecord>(line)
212                            && r.id == id
213                        {
214                            found = true;
215                            return false;
216                        }
217                        true
218                    })
219                    .collect::<Vec<_>>();
220                (kept, found)
221            };
222
223            if found {
224                let new_content = kept.join("\n");
225                let new_content = if new_content.is_empty() {
226                    new_content
227                } else {
228                    format!("{new_content}\n")
229                };
230                tokio::fs::write(&path, new_content.as_bytes())
231                    .await
232                    .map_err(|e| {
233                        StygianError::Service(ServiceError::InvalidResponse(format!(
234                            "FileStorage: rewrite after delete failed: {e}"
235                        )))
236                    })?;
237                return Ok(());
238            }
239        }
240
241        Ok(())
242    }
243}
244
245// ─────────────────────────────────────────────────────────────────────────────
246// PostgresStorage — feature = "postgres"
247// ─────────────────────────────────────────────────────────────────────────────
248
249#[cfg(feature = "postgres")]
250pub use postgres::PostgresStorage;
251
252#[cfg(feature = "postgres")]
253mod postgres {
254    //! PostgreSQL-backed storage via sqlx.
255    //!
256    //! Assumes the following table exists:
257    //!
258    //! ```sql
259    //! CREATE TABLE IF NOT EXISTS pipeline_records (
260    //!     id          TEXT PRIMARY KEY,
261    //!     pipeline_id TEXT NOT NULL,
262    //!     node_name   TEXT NOT NULL,
263    //!     data        JSONB NOT NULL,
264    //!     metadata    JSONB NOT NULL DEFAULT '{}',
265    //!     timestamp_ms BIGINT NOT NULL
266    //! );
267    //! CREATE INDEX IF NOT EXISTS idx_pipeline_records_pipeline_id
268    //!     ON pipeline_records (pipeline_id);
269    //! ```
270
271    use crate::domain::error::{Result, ServiceError, StygianError};
272    use crate::ports::storage::{StoragePort, StorageRecord};
273    use sqlx::{PgPool, Row};
274
275    /// `PostgreSQL` storage adapter.
276    ///
277    /// # Example
278    ///
279    /// ```no_run
280    /// use stygian_graph::adapters::storage::PostgresStorage;
281    /// use sqlx::PgPool;
282    ///
283    /// # tokio_test::block_on(async {
284    /// let pool = PgPool::connect("postgres://localhost/stygian").await.unwrap();
285    /// let storage = PostgresStorage::new(pool);
286    /// # });
287    /// ```
288    pub struct PostgresStorage {
289        pool: PgPool,
290    }
291
292    impl PostgresStorage {
293        /// Create a new [`PostgresStorage`] from a connection pool.
294        ///
295        /// # Example
296        ///
297        /// ```no_run
298        /// use stygian_graph::adapters::storage::PostgresStorage;
299        /// use sqlx::PgPool;
300        ///
301        /// # tokio_test::block_on(async {
302        /// let pool = PgPool::connect("postgres://localhost/stygian").await.unwrap();
303        /// let s = PostgresStorage::new(pool);
304        /// # });
305        /// ```
306        #[must_use]
307        pub const fn new(pool: PgPool) -> Self {
308            Self { pool }
309        }
310    }
311
312    #[async_trait::async_trait]
313    impl StoragePort for PostgresStorage {
314        async fn store(&self, record: StorageRecord) -> Result<()> {
315            let metadata_json = serde_json::to_value(&record.metadata).map_err(|e| {
316                StygianError::Service(ServiceError::InvalidResponse(format!(
317                    "PostgresStorage: metadata serialise: {e}"
318                )))
319            })?;
320
321            sqlx::query(
322                "
323                INSERT INTO pipeline_records
324                    (id, pipeline_id, node_name, data, metadata, timestamp_ms)
325                VALUES ($1, $2, $3, $4, $5, $6)
326                ON CONFLICT (id) DO NOTHING
327                ",
328            )
329            .bind(&record.id)
330            .bind(&record.pipeline_id)
331            .bind(&record.node_name)
332            .bind(&record.data)
333            .bind(metadata_json)
334            .bind(i64::try_from(record.timestamp_ms).unwrap_or(i64::MAX))
335            .execute(&self.pool)
336            .await
337            .map_err(|e| {
338                StygianError::Service(ServiceError::InvalidResponse(format!(
339                    "PostgresStorage: insert failed: {e}"
340                )))
341            })?;
342
343            Ok(())
344        }
345
346        async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
347            let row = sqlx::query(
348                "
349                SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
350                FROM pipeline_records
351                WHERE id = $1
352                ",
353            )
354            .bind(id)
355            .fetch_optional(&self.pool)
356            .await
357            .map_err(|e| {
358                StygianError::Service(ServiceError::InvalidResponse(format!(
359                    "PostgresStorage: retrieve failed: {e}"
360                )))
361            })?;
362
363            row.map_or(Ok(None), |r| {
364                let metadata = serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
365                    .unwrap_or_default();
366                Ok(Some(StorageRecord {
367                    id: r.get("id"),
368                    pipeline_id: r.get("pipeline_id"),
369                    node_name: r.get("node_name"),
370                    data: r.get("data"),
371                    metadata,
372                    timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
373                }))
374            })
375        }
376
377        async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
378            let rows = sqlx::query(
379                "
380                SELECT id, pipeline_id, node_name, data, metadata, timestamp_ms
381                FROM pipeline_records
382                WHERE pipeline_id = $1
383                ORDER BY timestamp_ms ASC
384                ",
385            )
386            .bind(pipeline_id)
387            .fetch_all(&self.pool)
388            .await
389            .map_err(|e| {
390                StygianError::Service(ServiceError::InvalidResponse(format!(
391                    "PostgresStorage: list failed: {e}"
392                )))
393            })?;
394
395            let records = rows
396                .into_iter()
397                .map(|r| {
398                    let metadata =
399                        serde_json::from_value(r.get::<serde_json::Value, _>("metadata"))
400                            .unwrap_or_default();
401                    StorageRecord {
402                        id: r.get("id"),
403                        pipeline_id: r.get("pipeline_id"),
404                        node_name: r.get("node_name"),
405                        data: r.get("data"),
406                        metadata,
407                        timestamp_ms: u64::try_from(r.get::<i64, _>("timestamp_ms")).unwrap_or(0),
408                    }
409                })
410                .collect();
411
412            Ok(records)
413        }
414
415        async fn delete(&self, id: &str) -> Result<()> {
416            sqlx::query("DELETE FROM pipeline_records WHERE id = $1")
417                .bind(id)
418                .execute(&self.pool)
419                .await
420                .map_err(|e| {
421                    StygianError::Service(ServiceError::InvalidResponse(format!(
422                        "PostgresStorage: delete failed: {e}"
423                    )))
424                })?;
425
426            Ok(())
427        }
428    }
429}
430
431// ─────────────────────────────────────────────────────────────────────────────
432// Tests
433// ─────────────────────────────────────────────────────────────────────────────
434
435#[cfg(test)]
436#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
437mod tests {
438    use super::{FileStorage, NullStorage};
439    use crate::ports::storage::{StoragePort, StorageRecord};
440    use serde_json::json;
441
442    #[tokio::test]
443    async fn null_storage_store_and_retrieve() {
444        let s = NullStorage;
445        let r = StorageRecord::new("p", "n", json!(null));
446        s.store(r.clone()).await.unwrap();
447        let got = s.retrieve(&r.id).await.unwrap();
448        assert!(got.is_none(), "NullStorage must always return None");
449    }
450
451    #[tokio::test]
452    async fn null_storage_list_and_delete_are_noops() {
453        let s = NullStorage;
454        let list = s.list("any").await.unwrap();
455        assert!(list.is_empty());
456        s.delete("any-id").await.unwrap();
457    }
458
459    #[tokio::test]
460    async fn file_storage_roundtrip() {
461        let dir = tempfile::tempdir().unwrap();
462        let storage = FileStorage::new(dir.path().to_path_buf());
463
464        let r = StorageRecord::new(
465            "pipe-roundtrip",
466            "fetch",
467            json!({"url": "https://example.com"}),
468        );
469        let id = r.id.clone();
470
471        storage.store(r).await.unwrap();
472
473        let retrieved = storage.retrieve(&id).await.unwrap().unwrap();
474        assert_eq!(retrieved.id, id);
475        assert_eq!(retrieved.pipeline_id, "pipe-roundtrip");
476        assert_eq!(retrieved.node_name, "fetch");
477    }
478
479    #[tokio::test]
480    async fn file_storage_list_scoped_to_pipeline() {
481        let dir = tempfile::tempdir().unwrap();
482        let storage = FileStorage::new(dir.path().to_path_buf());
483
484        storage
485            .store(StorageRecord::new("pipe-a", "step1", json!(1)))
486            .await
487            .unwrap();
488        storage
489            .store(StorageRecord::new("pipe-a", "step2", json!(2)))
490            .await
491            .unwrap();
492        storage
493            .store(StorageRecord::new("pipe-b", "step1", json!(3)))
494            .await
495            .unwrap();
496
497        let pipe_a = storage.list("pipe-a").await.unwrap();
498        assert_eq!(pipe_a.len(), 2);
499
500        let pipe_b = storage.list("pipe-b").await.unwrap();
501        assert_eq!(pipe_b.len(), 1);
502    }
503
504    #[tokio::test]
505    async fn file_storage_delete_removes_record() {
506        let dir = tempfile::tempdir().unwrap();
507        let storage = FileStorage::new(dir.path().to_path_buf());
508
509        let r1 = StorageRecord::new("pipe-del", "n", json!(1));
510        let r2 = StorageRecord::new("pipe-del", "n", json!(2));
511        let id1 = r1.id.clone();
512
513        storage.store(r1).await.unwrap();
514        storage.store(r2).await.unwrap();
515
516        storage.delete(&id1).await.unwrap();
517
518        let records = storage.list("pipe-del").await.unwrap();
519        assert_eq!(records.len(), 1);
520        assert_ne!(records[0].id, id1);
521    }
522
523    #[tokio::test]
524    async fn file_storage_retrieve_not_found_returns_none() {
525        let dir = tempfile::tempdir().unwrap();
526        let storage = FileStorage::new(dir.path().to_path_buf());
527        let result = storage.retrieve("no-such-id").await.unwrap();
528        assert!(result.is_none());
529    }
530
531    #[tokio::test]
532    async fn file_storage_path_sanitises_separators() {
533        let dir = tempfile::tempdir().unwrap();
534        let storage = FileStorage::new(dir.path().to_path_buf());
535
536        // pipeline_id with slashes should not escape the base directory
537        let r = StorageRecord::new("../../etc/passwd", "n", json!(null));
538        storage.store(r).await.unwrap();
539
540        let files: Vec<_> = std::fs::read_dir(dir.path())
541            .unwrap()
542            .filter_map(Result::ok)
543            .collect();
544        // File must be inside the temp dir, not some other directory
545        assert_eq!(files.len(), 1);
546        let fname = files[0].file_name();
547        assert!(
548            fname.to_string_lossy().contains("__"),
549            "separators must be sanitised: got {fname:?}"
550        );
551    }
552
553    #[tokio::test]
554    async fn file_storage_retrieve_finds_correct_record() {
555        let dir = tempfile::tempdir().unwrap();
556        let storage = FileStorage::new(dir.path().to_path_buf());
557
558        // Store records across two pipelines to exercise full-dir scan in retrieve
559        let r1 = StorageRecord::new("pipe-x", "node-1", json!({"val": 1}));
560        let r2 = StorageRecord::new("pipe-y", "node-2", json!({"val": 2}));
561        let id1 = r1.id.clone();
562        let id2 = r2.id.clone();
563
564        storage.store(r1).await.unwrap();
565        storage.store(r2).await.unwrap();
566
567        let found = storage.retrieve(&id1).await.unwrap().unwrap();
568        assert_eq!(found.id, id1);
569        assert_eq!(found.pipeline_id, "pipe-x");
570
571        let found2 = storage.retrieve(&id2).await.unwrap().unwrap();
572        assert_eq!(found2.id, id2);
573        assert_eq!(found2.pipeline_id, "pipe-y");
574    }
575
576    #[tokio::test]
577    async fn file_storage_retrieve_missing_returns_none() {
578        let dir = tempfile::tempdir().unwrap();
579        let storage = FileStorage::new(dir.path().to_path_buf());
580        // Store something so the dir exists and the scan loop runs
581        storage
582            .store(StorageRecord::new("p", "n", json!(0)))
583            .await
584            .unwrap();
585        let result = storage.retrieve("nonexistent-id").await.unwrap();
586        assert!(result.is_none());
587    }
588
589    #[tokio::test]
590    async fn file_storage_delete_nonexistent_dir_is_noop() {
591        // Dir is never created — delete should return Ok without panicking
592        let storage = FileStorage::new(std::path::PathBuf::from("/tmp/stygian-no-such-dir-xyz"));
593        storage.delete("any-id").await.unwrap();
594    }
595
596    #[tokio::test]
597    async fn file_storage_delete_id_not_present_is_noop() {
598        let dir = tempfile::tempdir().unwrap();
599        let storage = FileStorage::new(dir.path().to_path_buf());
600        let r = StorageRecord::new("pipe-z", "n", json!(42));
601        storage.store(r).await.unwrap();
602        // Deleting a non-existent id should not modify the file
603        storage.delete("totally-unknown-id").await.unwrap();
604        let records = storage.list("pipe-z").await.unwrap();
605        assert_eq!(records.len(), 1);
606    }
607
608    #[tokio::test]
609    async fn file_storage_list_missing_pipeline_returns_empty() {
610        let dir = tempfile::tempdir().unwrap();
611        let storage = FileStorage::new(dir.path().to_path_buf());
612        let records = storage.list("never-stored").await.unwrap();
613        assert!(records.is_empty());
614    }
615}