Skip to main content

shiplog_ingest_json/
lib.rs

1//! JSONL/JSON ingestion adapter for prebuilt shiplog ledgers.
2//!
3//! Loads canonical ledger artifacts and returns
4//! them through the [`shiplog_ports::Ingestor`] interface.
5
6use anyhow::{Context, Result};
7use chrono::Utc;
8use shiplog_ports::{IngestOutput, Ingestor};
9use shiplog_schema::coverage::CoverageManifest;
10use shiplog_schema::event::EventEnvelope;
11use shiplog_schema::freshness::{FreshnessStatus, SourceFreshness};
12use std::path::PathBuf;
13
14/// Simple adapter that ingests JSONL events + a JSON coverage manifest.
15///
16/// This is useful for:
17/// - tests
18/// - fixtures
19/// - future "org mode" where an upstream collector produces a ledger and shiplog just renders
20///
21/// # Examples
22///
23/// ```rust,no_run
24/// use shiplog_ingest_json::JsonIngestor;
25/// use shiplog_ports::Ingestor;
26/// use std::path::PathBuf;
27///
28/// let ingestor = JsonIngestor {
29///     events_path: PathBuf::from("ledger.events.jsonl"),
30///     coverage_path: PathBuf::from("coverage.manifest.json"),
31/// };
32/// let output = ingestor.ingest()?;
33/// println!("Loaded {} events", output.events.len());
34/// # Ok::<(), anyhow::Error>(())
35/// ```
36pub struct JsonIngestor {
37    pub events_path: PathBuf,
38    pub coverage_path: PathBuf,
39}
40
41impl Ingestor for JsonIngestor {
42    fn ingest(&self) -> Result<IngestOutput> {
43        let events = read_events(&self.events_path)?;
44        let coverage = read_coverage(&self.coverage_path)?;
45        let freshness = vec![SourceFreshness {
46            source: "json_import".to_string(),
47            status: FreshnessStatus::Fresh,
48            cache_hits: 0,
49            cache_misses: 0,
50            fetched_at: Some(Utc::now()),
51            reason: None,
52        }];
53        Ok(IngestOutput {
54            events,
55            coverage,
56            freshness,
57        })
58    }
59}
60
61/// Parse JSONL text into a vector of [`EventEnvelope`]s.
62///
63/// Each non-empty line is parsed as a JSON-encoded `EventEnvelope`.
64/// `source` is included in error context messages.
65///
66/// # Examples
67///
68/// ```
69/// use shiplog_ingest_json::parse_events_jsonl;
70///
71/// // Empty input yields no events:
72/// let events = parse_events_jsonl("", "test").unwrap();
73/// assert!(events.is_empty());
74///
75/// // Blank lines are silently skipped:
76/// let events = parse_events_jsonl("\n  \n", "test").unwrap();
77/// assert!(events.is_empty());
78/// ```
79pub fn parse_events_jsonl(text: &str, source: &str) -> Result<Vec<EventEnvelope>> {
80    let mut out = Vec::new();
81    for (i, line) in text.lines().enumerate() {
82        if line.trim().is_empty() {
83            continue;
84        }
85        let ev: EventEnvelope = serde_json::from_str(line)
86            .with_context(|| format!("parse event json line {} in {source}", i + 1))?;
87        out.push(ev);
88    }
89    Ok(out)
90}
91
92fn read_events(path: &PathBuf) -> Result<Vec<EventEnvelope>> {
93    let text = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?;
94    parse_events_jsonl(&text, &format!("{path:?}"))
95}
96
97fn read_coverage(path: &PathBuf) -> Result<CoverageManifest> {
98    let text = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?;
99    let cov: CoverageManifest =
100        serde_json::from_str(&text).with_context(|| format!("parse coverage manifest {path:?}"))?;
101    Ok(cov)
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use chrono::{NaiveDate, Utc};
108    use shiplog_bundle::{FILE_COVERAGE_MANIFEST_JSON, FILE_LEDGER_EVENTS_JSONL};
109    use shiplog_ids::{EventId, RunId};
110    use shiplog_schema::coverage::{Completeness, CoverageManifest, TimeWindow};
111    use shiplog_schema::event::*;
112    use std::io::Write;
113
114    fn make_test_event(repo_name: &str, event_id: &str) -> EventEnvelope {
115        EventEnvelope {
116            id: EventId::from_parts(["test", event_id]),
117            kind: EventKind::PullRequest,
118            occurred_at: Utc::now(),
119            actor: Actor {
120                login: "testuser".into(),
121                id: None,
122            },
123            repo: RepoRef {
124                full_name: repo_name.into(),
125                html_url: Some(format!("https://github.com/{repo_name}")),
126                visibility: RepoVisibility::Public,
127            },
128            payload: EventPayload::PullRequest(PullRequestEvent {
129                number: 1,
130                title: "Test PR".into(),
131                state: PullRequestState::Merged,
132                created_at: Utc::now(),
133                merged_at: Some(Utc::now()),
134                additions: Some(10),
135                deletions: Some(2),
136                changed_files: Some(3),
137                touched_paths_hint: vec![],
138                window: None,
139            }),
140            tags: vec![],
141            links: vec![],
142            source: SourceRef {
143                system: SourceSystem::JsonImport,
144                url: None,
145                opaque_id: None,
146            },
147        }
148    }
149
150    fn make_test_coverage() -> CoverageManifest {
151        CoverageManifest {
152            run_id: RunId::now("test"),
153            generated_at: Utc::now(),
154            user: "testuser".into(),
155            window: TimeWindow {
156                since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
157                until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
158            },
159            mode: "merged".into(),
160            sources: vec!["json-import".into()],
161            slices: vec![],
162            warnings: vec![],
163            completeness: Completeness::Complete,
164        }
165    }
166
167    #[test]
168    fn valid_jsonl_roundtrip() {
169        let temp = tempfile::tempdir().unwrap();
170        let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
171        let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
172
173        let ev1 = make_test_event("org/repo1", "ev1");
174        let ev2 = make_test_event("org/repo2", "ev2");
175        let coverage = make_test_coverage();
176
177        // Write events as JSONL
178        {
179            let mut f = std::fs::File::create(&events_path).unwrap();
180            writeln!(f, "{}", serde_json::to_string(&ev1).unwrap()).unwrap();
181            writeln!(f, "{}", serde_json::to_string(&ev2).unwrap()).unwrap();
182        }
183        std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
184
185        let ing = JsonIngestor {
186            events_path,
187            coverage_path,
188        };
189        let output = ing.ingest().unwrap();
190        assert_eq!(output.events.len(), 2);
191        assert_eq!(output.events[0].repo.full_name, "org/repo1");
192        assert_eq!(output.events[1].repo.full_name, "org/repo2");
193        assert_eq!(output.coverage.user, "testuser");
194    }
195
196    #[test]
197    fn missing_events_file_returns_error() {
198        let temp = tempfile::tempdir().unwrap();
199        let events_path = temp.path().join("nonexistent.jsonl");
200        let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
201
202        let coverage = make_test_coverage();
203        std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
204
205        let ing = JsonIngestor {
206            events_path,
207            coverage_path,
208        };
209        let result = ing.ingest();
210        assert!(result.is_err());
211    }
212
213    #[test]
214    fn blank_lines_in_jsonl_are_skipped() {
215        let temp = tempfile::tempdir().unwrap();
216        let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
217        let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
218
219        let ev = make_test_event("org/repo", "ev1");
220        let coverage = make_test_coverage();
221
222        // Write with blank lines
223        {
224            let mut f = std::fs::File::create(&events_path).unwrap();
225            writeln!(f).unwrap();
226            writeln!(f, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
227            writeln!(f).unwrap();
228            writeln!(f, "   ").unwrap();
229        }
230        std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
231
232        let ing = JsonIngestor {
233            events_path,
234            coverage_path,
235        };
236        let output = ing.ingest().unwrap();
237        assert_eq!(output.events.len(), 1);
238    }
239
240    #[test]
241    fn invalid_json_line_returns_error_with_line_number() {
242        let temp = tempfile::tempdir().unwrap();
243        let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
244        let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
245
246        let ev = make_test_event("org/repo", "ev1");
247        let coverage = make_test_coverage();
248
249        {
250            let mut f = std::fs::File::create(&events_path).unwrap();
251            writeln!(f, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
252            writeln!(f, "{{not valid json}}").unwrap();
253        }
254        std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
255
256        let ing = JsonIngestor {
257            events_path,
258            coverage_path,
259        };
260        let result = ing.ingest();
261        assert!(result.is_err());
262        let err_msg = format!("{:#}", result.unwrap_err());
263        assert!(
264            err_msg.contains("line 2"),
265            "Expected error to mention line number, got: {err_msg}"
266        );
267    }
268}