1use anyhow::{Context, Result};
7use chrono::Utc;
8use shiplog_ports::{IngestOutput, Ingestor};
9use shiplog_schema::coverage::CoverageManifest;
10use shiplog_schema::event::EventEnvelope;
11use shiplog_schema::freshness::{FreshnessStatus, SourceFreshness};
12use std::path::PathBuf;
13
14pub struct JsonIngestor {
37 pub events_path: PathBuf,
38 pub coverage_path: PathBuf,
39}
40
41impl Ingestor for JsonIngestor {
42 fn ingest(&self) -> Result<IngestOutput> {
43 let events = read_events(&self.events_path)?;
44 let coverage = read_coverage(&self.coverage_path)?;
45 let freshness = vec![SourceFreshness {
46 source: "json_import".to_string(),
47 status: FreshnessStatus::Fresh,
48 cache_hits: 0,
49 cache_misses: 0,
50 fetched_at: Some(Utc::now()),
51 reason: None,
52 }];
53 Ok(IngestOutput {
54 events,
55 coverage,
56 freshness,
57 })
58 }
59}
60
61pub fn parse_events_jsonl(text: &str, source: &str) -> Result<Vec<EventEnvelope>> {
80 let mut out = Vec::new();
81 for (i, line) in text.lines().enumerate() {
82 if line.trim().is_empty() {
83 continue;
84 }
85 let ev: EventEnvelope = serde_json::from_str(line)
86 .with_context(|| format!("parse event json line {} in {source}", i + 1))?;
87 out.push(ev);
88 }
89 Ok(out)
90}
91
92fn read_events(path: &PathBuf) -> Result<Vec<EventEnvelope>> {
93 let text = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?;
94 parse_events_jsonl(&text, &format!("{path:?}"))
95}
96
97fn read_coverage(path: &PathBuf) -> Result<CoverageManifest> {
98 let text = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?;
99 let cov: CoverageManifest =
100 serde_json::from_str(&text).with_context(|| format!("parse coverage manifest {path:?}"))?;
101 Ok(cov)
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use chrono::{NaiveDate, Utc};
108 use shiplog_bundle::{FILE_COVERAGE_MANIFEST_JSON, FILE_LEDGER_EVENTS_JSONL};
109 use shiplog_ids::{EventId, RunId};
110 use shiplog_schema::coverage::{Completeness, CoverageManifest, TimeWindow};
111 use shiplog_schema::event::*;
112 use std::io::Write;
113
114 fn make_test_event(repo_name: &str, event_id: &str) -> EventEnvelope {
115 EventEnvelope {
116 id: EventId::from_parts(["test", event_id]),
117 kind: EventKind::PullRequest,
118 occurred_at: Utc::now(),
119 actor: Actor {
120 login: "testuser".into(),
121 id: None,
122 },
123 repo: RepoRef {
124 full_name: repo_name.into(),
125 html_url: Some(format!("https://github.com/{repo_name}")),
126 visibility: RepoVisibility::Public,
127 },
128 payload: EventPayload::PullRequest(PullRequestEvent {
129 number: 1,
130 title: "Test PR".into(),
131 state: PullRequestState::Merged,
132 created_at: Utc::now(),
133 merged_at: Some(Utc::now()),
134 additions: Some(10),
135 deletions: Some(2),
136 changed_files: Some(3),
137 touched_paths_hint: vec![],
138 window: None,
139 }),
140 tags: vec![],
141 links: vec![],
142 source: SourceRef {
143 system: SourceSystem::JsonImport,
144 url: None,
145 opaque_id: None,
146 },
147 }
148 }
149
150 fn make_test_coverage() -> CoverageManifest {
151 CoverageManifest {
152 run_id: RunId::now("test"),
153 generated_at: Utc::now(),
154 user: "testuser".into(),
155 window: TimeWindow {
156 since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
157 until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
158 },
159 mode: "merged".into(),
160 sources: vec!["json-import".into()],
161 slices: vec![],
162 warnings: vec![],
163 completeness: Completeness::Complete,
164 }
165 }
166
167 #[test]
168 fn valid_jsonl_roundtrip() {
169 let temp = tempfile::tempdir().unwrap();
170 let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
171 let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
172
173 let ev1 = make_test_event("org/repo1", "ev1");
174 let ev2 = make_test_event("org/repo2", "ev2");
175 let coverage = make_test_coverage();
176
177 {
179 let mut f = std::fs::File::create(&events_path).unwrap();
180 writeln!(f, "{}", serde_json::to_string(&ev1).unwrap()).unwrap();
181 writeln!(f, "{}", serde_json::to_string(&ev2).unwrap()).unwrap();
182 }
183 std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
184
185 let ing = JsonIngestor {
186 events_path,
187 coverage_path,
188 };
189 let output = ing.ingest().unwrap();
190 assert_eq!(output.events.len(), 2);
191 assert_eq!(output.events[0].repo.full_name, "org/repo1");
192 assert_eq!(output.events[1].repo.full_name, "org/repo2");
193 assert_eq!(output.coverage.user, "testuser");
194 }
195
196 #[test]
197 fn missing_events_file_returns_error() {
198 let temp = tempfile::tempdir().unwrap();
199 let events_path = temp.path().join("nonexistent.jsonl");
200 let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
201
202 let coverage = make_test_coverage();
203 std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
204
205 let ing = JsonIngestor {
206 events_path,
207 coverage_path,
208 };
209 let result = ing.ingest();
210 assert!(result.is_err());
211 }
212
213 #[test]
214 fn blank_lines_in_jsonl_are_skipped() {
215 let temp = tempfile::tempdir().unwrap();
216 let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
217 let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
218
219 let ev = make_test_event("org/repo", "ev1");
220 let coverage = make_test_coverage();
221
222 {
224 let mut f = std::fs::File::create(&events_path).unwrap();
225 writeln!(f).unwrap();
226 writeln!(f, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
227 writeln!(f).unwrap();
228 writeln!(f, " ").unwrap();
229 }
230 std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
231
232 let ing = JsonIngestor {
233 events_path,
234 coverage_path,
235 };
236 let output = ing.ingest().unwrap();
237 assert_eq!(output.events.len(), 1);
238 }
239
240 #[test]
241 fn invalid_json_line_returns_error_with_line_number() {
242 let temp = tempfile::tempdir().unwrap();
243 let events_path = temp.path().join(FILE_LEDGER_EVENTS_JSONL);
244 let coverage_path = temp.path().join(FILE_COVERAGE_MANIFEST_JSON);
245
246 let ev = make_test_event("org/repo", "ev1");
247 let coverage = make_test_coverage();
248
249 {
250 let mut f = std::fs::File::create(&events_path).unwrap();
251 writeln!(f, "{}", serde_json::to_string(&ev).unwrap()).unwrap();
252 writeln!(f, "{{not valid json}}").unwrap();
253 }
254 std::fs::write(&coverage_path, serde_json::to_string(&coverage).unwrap()).unwrap();
255
256 let ing = JsonIngestor {
257 events_path,
258 coverage_path,
259 };
260 let result = ing.ingest();
261 assert!(result.is_err());
262 let err_msg = format!("{:#}", result.unwrap_err());
263 assert!(
264 err_msg.contains("line 2"),
265 "Expected error to mention line number, got: {err_msg}"
266 );
267 }
268}