Skip to main content

shiplog_engine/
lib.rs

1//! Orchestration engine for the shiplog pipeline.
2//!
3//! Wires together ingestors, clusterers, redactors, and renderers to drive the
4//! `collect`, `render`, `refresh`, and `run` commands. This is the main
5//! coordination layer between the CLI and the microcrate adapters.
6
7use anyhow::{Context, Result};
8use shiplog_bundle::{write_bundle_manifest, write_zip};
9use shiplog_ports::{IngestOutput, Redactor, Renderer, WorkstreamClusterer};
10use shiplog_render_json::{write_coverage_manifest, write_events_jsonl};
11use shiplog_schema::bundle::BundleProfile;
12use shiplog_schema::coverage::CoverageManifest;
13use shiplog_schema::event::EventEnvelope;
14use shiplog_schema::workstream::WorkstreamsFile;
15use shiplog_workstreams::WorkstreamManager;
16use std::path::{Path, PathBuf};
17
18pub struct Engine<'a> {
19    pub renderer: &'a dyn Renderer,
20    pub clusterer: &'a dyn WorkstreamClusterer,
21    pub redactor: &'a dyn Redactor,
22}
23
24pub struct RunOutputs {
25    pub out_dir: PathBuf,
26    pub packet_md: PathBuf,
27    pub workstreams_yaml: PathBuf,
28    pub ledger_events_jsonl: PathBuf,
29    pub coverage_manifest_json: PathBuf,
30    pub bundle_manifest_json: PathBuf,
31    pub zip_path: Option<PathBuf>,
32}
33
34/// What type of workstream file was used/created
35pub enum WorkstreamSource {
36    /// User-curated workstreams.yaml
37    Curated,
38    /// Machine-generated workstreams.suggested.yaml
39    Suggested,
40    /// Newly generated from events
41    Generated,
42}
43
44impl<'a> Engine<'a> {
45    pub fn new(
46        renderer: &'a dyn Renderer,
47        clusterer: &'a dyn WorkstreamClusterer,
48        redactor: &'a dyn Redactor,
49    ) -> Self {
50        Self {
51            renderer,
52            clusterer,
53            redactor,
54        }
55    }
56
57    /// Run the full pipeline: ingest → cluster → render
58    ///
59    /// Uses WorkstreamManager to respect user-curated workstreams.
60    pub fn run(
61        &self,
62        ingest: IngestOutput,
63        user: &str,
64        window_label: &str,
65        out_dir: &Path,
66        zip: bool,
67        bundle_profile: &BundleProfile,
68    ) -> Result<(RunOutputs, WorkstreamSource)> {
69        std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
70
71        let events = ingest.events;
72        let coverage = ingest.coverage;
73
74        // Use WorkstreamManager to load or generate workstreams
75        let (workstreams, ws_source) = self.load_workstreams(out_dir, &events)?;
76
77        // Write canonical outputs
78        let ledger_path = out_dir.join("ledger.events.jsonl");
79        let coverage_path = out_dir.join("coverage.manifest.json");
80        let packet_path = out_dir.join("packet.md");
81
82        write_events_jsonl(&ledger_path, &events)?;
83        write_coverage_manifest(&coverage_path, &coverage)?;
84
85        // Note: workstreams.yaml is user-owned; we don't overwrite it
86        // workstreams.suggested.yaml is already written by WorkstreamManager if needed
87        let ws_path = match ws_source {
88            WorkstreamSource::Curated => WorkstreamManager::curated_path(out_dir),
89            WorkstreamSource::Suggested => WorkstreamManager::suggested_path(out_dir),
90            WorkstreamSource::Generated => WorkstreamManager::suggested_path(out_dir),
91        };
92
93        let packet = self.renderer.render_packet_markdown(
94            user,
95            window_label,
96            &events,
97            &workstreams,
98            &coverage,
99        )?;
100        std::fs::write(&packet_path, packet)?;
101
102        // Render profiles
103        self.render_profile(
104            "manager",
105            user,
106            window_label,
107            out_dir,
108            &events,
109            &workstreams,
110            &coverage,
111        )?;
112        self.render_profile(
113            "public",
114            user,
115            window_label,
116            out_dir,
117            &events,
118            &workstreams,
119            &coverage,
120        )?;
121
122        // Bundle manifest + zip
123        let run_id = &coverage.run_id;
124        let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)?;
125        let zip_path = if zip {
126            let z = zip_path_for_profile(out_dir, bundle_profile);
127            write_zip(out_dir, &z, bundle_profile)?;
128            Some(z)
129        } else {
130            None
131        };
132
133        Ok((
134            RunOutputs {
135                out_dir: out_dir.to_path_buf(),
136                packet_md: packet_path,
137                workstreams_yaml: ws_path,
138                ledger_events_jsonl: ledger_path,
139                coverage_manifest_json: coverage_path,
140                bundle_manifest_json: out_dir.join("bundle.manifest.json"),
141                zip_path,
142            },
143            ws_source,
144        ))
145    }
146
147    /// Load workstreams using WorkstreamManager
148    fn load_workstreams(
149        &self,
150        out_dir: &Path,
151        events: &[EventEnvelope],
152    ) -> Result<(WorkstreamsFile, WorkstreamSource)> {
153        let curated_exists = WorkstreamManager::has_curated(out_dir);
154        let suggested_exists = WorkstreamManager::suggested_path(out_dir).exists();
155
156        let ws = WorkstreamManager::load_effective(out_dir, self.clusterer, events)?;
157
158        let source = if curated_exists {
159            WorkstreamSource::Curated
160        } else if suggested_exists {
161            WorkstreamSource::Suggested
162        } else {
163            WorkstreamSource::Generated
164        };
165
166        Ok((ws, source))
167    }
168
169    /// Import a pre-built ledger and run the full render pipeline.
170    ///
171    /// When `workstreams` is `Some`, uses them directly (writes as curated).
172    /// When `None`, falls through to normal clustering.
173    #[allow(clippy::too_many_arguments)]
174    pub fn import(
175        &self,
176        ingest: IngestOutput,
177        user: &str,
178        window_label: &str,
179        out_dir: &Path,
180        zip: bool,
181        workstreams: Option<WorkstreamsFile>,
182        bundle_profile: &BundleProfile,
183    ) -> Result<(RunOutputs, WorkstreamSource)> {
184        std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
185
186        let events = ingest.events;
187        let coverage = ingest.coverage;
188
189        // Use provided workstreams or generate new ones
190        let (ws, ws_source) = if let Some(ws) = workstreams {
191            // Write imported workstreams as curated
192            let curated_path = WorkstreamManager::curated_path(out_dir);
193            shiplog_workstreams::write_workstreams(&curated_path, &ws)?;
194            (ws, WorkstreamSource::Curated)
195        } else {
196            self.load_workstreams(out_dir, &events)?
197        };
198
199        // Write canonical outputs
200        let ledger_path = out_dir.join("ledger.events.jsonl");
201        let coverage_path = out_dir.join("coverage.manifest.json");
202        let packet_path = out_dir.join("packet.md");
203
204        write_events_jsonl(&ledger_path, &events)?;
205        write_coverage_manifest(&coverage_path, &coverage)?;
206
207        let ws_path = match ws_source {
208            WorkstreamSource::Curated => WorkstreamManager::curated_path(out_dir),
209            WorkstreamSource::Suggested => WorkstreamManager::suggested_path(out_dir),
210            WorkstreamSource::Generated => WorkstreamManager::suggested_path(out_dir),
211        };
212
213        let packet =
214            self.renderer
215                .render_packet_markdown(user, window_label, &events, &ws, &coverage)?;
216        std::fs::write(&packet_path, packet)?;
217
218        // Render profiles
219        self.render_profile(
220            "manager",
221            user,
222            window_label,
223            out_dir,
224            &events,
225            &ws,
226            &coverage,
227        )?;
228        self.render_profile(
229            "public",
230            user,
231            window_label,
232            out_dir,
233            &events,
234            &ws,
235            &coverage,
236        )?;
237
238        // Bundle manifest + zip
239        let run_id = &coverage.run_id;
240        let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)?;
241        let zip_path = if zip {
242            let z = zip_path_for_profile(out_dir, bundle_profile);
243            write_zip(out_dir, &z, bundle_profile)?;
244            Some(z)
245        } else {
246            None
247        };
248
249        Ok((
250            RunOutputs {
251                out_dir: out_dir.to_path_buf(),
252                packet_md: packet_path,
253                workstreams_yaml: ws_path,
254                ledger_events_jsonl: ledger_path,
255                coverage_manifest_json: coverage_path,
256                bundle_manifest_json: out_dir.join("bundle.manifest.json"),
257                zip_path,
258            },
259            ws_source,
260        ))
261    }
262
263    /// Refresh receipts and stats without regenerating workstreams
264    ///
265    /// This preserves user curation while updating event data.
266    pub fn refresh(
267        &self,
268        ingest: IngestOutput,
269        user: &str,
270        window_label: &str,
271        out_dir: &Path,
272        zip: bool,
273        bundle_profile: &BundleProfile,
274    ) -> Result<RunOutputs> {
275        std::fs::create_dir_all(out_dir).with_context(|| format!("create {out_dir:?}"))?;
276
277        let events = ingest.events;
278        let coverage = ingest.coverage;
279
280        // Load existing workstreams — error if none exist
281        let workstreams = if WorkstreamManager::has_curated(out_dir) {
282            let path = WorkstreamManager::curated_path(out_dir);
283            let text = std::fs::read_to_string(&path)
284                .with_context(|| format!("read curated workstreams from {path:?}"))?;
285            serde_yaml::from_str(&text)
286                .with_context(|| format!("parse curated workstreams yaml {path:?}"))?
287        } else {
288            let suggested_path = WorkstreamManager::suggested_path(out_dir);
289            if suggested_path.exists() {
290                let text = std::fs::read_to_string(&suggested_path).with_context(|| {
291                    format!("read suggested workstreams from {suggested_path:?}")
292                })?;
293                serde_yaml::from_str(&text).with_context(|| {
294                    format!("parse suggested workstreams yaml {suggested_path:?}")
295                })?
296            } else {
297                anyhow::bail!(
298                    "No workstreams found. Run `shiplog collect` first to generate workstreams."
299                );
300            }
301        };
302
303        // Write canonical outputs
304        let ledger_path = out_dir.join("ledger.events.jsonl");
305        let coverage_path = out_dir.join("coverage.manifest.json");
306        let packet_path = out_dir.join("packet.md");
307
308        write_events_jsonl(&ledger_path, &events)?;
309        write_coverage_manifest(&coverage_path, &coverage)?;
310
311        let ws_path = if WorkstreamManager::has_curated(out_dir) {
312            WorkstreamManager::curated_path(out_dir)
313        } else {
314            WorkstreamManager::suggested_path(out_dir)
315        };
316
317        let packet = self.renderer.render_packet_markdown(
318            user,
319            window_label,
320            &events,
321            &workstreams,
322            &coverage,
323        )?;
324        std::fs::write(&packet_path, packet)?;
325
326        // Render profiles
327        self.render_profile(
328            "manager",
329            user,
330            window_label,
331            out_dir,
332            &events,
333            &workstreams,
334            &coverage,
335        )?;
336        self.render_profile(
337            "public",
338            user,
339            window_label,
340            out_dir,
341            &events,
342            &workstreams,
343            &coverage,
344        )?;
345
346        // Bundle manifest + zip
347        let run_id = &coverage.run_id;
348        let _bundle = write_bundle_manifest(out_dir, run_id, bundle_profile)?;
349        let zip_path = if zip {
350            let z = zip_path_for_profile(out_dir, bundle_profile);
351            write_zip(out_dir, &z, bundle_profile)?;
352            Some(z)
353        } else {
354            None
355        };
356
357        Ok(RunOutputs {
358            out_dir: out_dir.to_path_buf(),
359            packet_md: packet_path,
360            workstreams_yaml: ws_path,
361            ledger_events_jsonl: ledger_path,
362            coverage_manifest_json: coverage_path,
363            bundle_manifest_json: out_dir.join("bundle.manifest.json"),
364            zip_path,
365        })
366    }
367
368    #[allow(clippy::too_many_arguments)]
369    fn render_profile(
370        &self,
371        profile: &str,
372        user: &str,
373        window_label: &str,
374        out_dir: &Path,
375        events: &[EventEnvelope],
376        workstreams: &WorkstreamsFile,
377        coverage: &CoverageManifest,
378    ) -> Result<()> {
379        let prof_dir = out_dir.join("profiles").join(profile);
380        std::fs::create_dir_all(&prof_dir)?;
381
382        let red_events = self.redactor.redact_events(events, profile)?;
383        let red_ws = self.redactor.redact_workstreams(workstreams, profile)?;
384
385        let md = self.renderer.render_packet_markdown(
386            user,
387            window_label,
388            &red_events,
389            &red_ws,
390            coverage,
391        )?;
392        std::fs::write(prof_dir.join("packet.md"), md)?;
393        Ok(())
394    }
395}
396
397/// Compute the zip file path based on bundle profile.
398/// `Internal` -> `<run_dir>.zip`, others -> `<run_dir>.<profile>.zip`.
399fn zip_path_for_profile(out_dir: &Path, profile: &BundleProfile) -> PathBuf {
400    match profile {
401        BundleProfile::Internal => out_dir.with_extension("zip"),
402        _ => {
403            let stem = out_dir.file_name().unwrap_or_default().to_string_lossy();
404            let name = format!("{}.{}.zip", stem, profile.as_str());
405            out_dir.with_file_name(name)
406        }
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use chrono::{NaiveDate, TimeZone, Utc};
414    use shiplog_ids::{EventId, RunId};
415    use shiplog_ports::IngestOutput;
416    use shiplog_schema::coverage::{Completeness, CoverageManifest, TimeWindow};
417    use shiplog_schema::event::*;
418
419    fn pr_event(repo: &str, number: u64, title: &str) -> EventEnvelope {
420        EventEnvelope {
421            id: EventId::from_parts(["github", "pr", repo, &number.to_string()]),
422            kind: EventKind::PullRequest,
423            occurred_at: Utc.timestamp_opt(0, 0).unwrap(),
424            actor: Actor {
425                login: "user".into(),
426                id: None,
427            },
428            repo: RepoRef {
429                full_name: repo.to_string(),
430                html_url: Some(format!("https://github.com/{repo}")),
431                visibility: RepoVisibility::Unknown,
432            },
433            payload: EventPayload::PullRequest(PullRequestEvent {
434                number,
435                title: title.to_string(),
436                state: PullRequestState::Merged,
437                created_at: Utc.timestamp_opt(0, 0).unwrap(),
438                merged_at: Some(Utc.timestamp_opt(0, 0).unwrap()),
439                additions: Some(1),
440                deletions: Some(0),
441                changed_files: Some(1),
442                touched_paths_hint: vec![],
443                window: Some(TimeWindow {
444                    since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
445                    until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
446                }),
447            }),
448            tags: vec![],
449            links: vec![Link {
450                label: "pr".into(),
451                url: format!("https://github.com/{repo}/pull/{number}"),
452            }],
453            source: SourceRef {
454                system: SourceSystem::Github,
455                url: Some("https://api.github.com/...".into()),
456                opaque_id: None,
457            },
458        }
459    }
460
461    fn test_ingest() -> IngestOutput {
462        let events = vec![
463            pr_event("acme/foo", 1, "Add feature"),
464            pr_event("acme/foo", 2, "Fix bug"),
465        ];
466        let coverage = CoverageManifest {
467            run_id: RunId("test_run_1".into()),
468            generated_at: Utc.timestamp_opt(0, 0).unwrap(),
469            user: "tester".into(),
470            window: TimeWindow {
471                since: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
472                until: NaiveDate::from_ymd_opt(2025, 2, 1).unwrap(),
473            },
474            mode: "merged".into(),
475            sources: vec!["github".into()],
476            slices: vec![],
477            warnings: vec![],
478            completeness: Completeness::Complete,
479        };
480        IngestOutput { events, coverage }
481    }
482
483    fn test_engine() -> Engine<'static> {
484        let renderer: &'static dyn shiplog_ports::Renderer =
485            Box::leak(Box::new(shiplog_render_md::MarkdownRenderer));
486        let clusterer: &'static dyn shiplog_ports::WorkstreamClusterer =
487            Box::leak(Box::new(shiplog_workstreams::RepoClusterer));
488        let redactor: &'static dyn shiplog_ports::Redactor = Box::leak(Box::new(
489            shiplog_redact::DeterministicRedactor::new(b"test-key"),
490        ));
491        Engine::new(renderer, clusterer, redactor)
492    }
493
494    #[test]
495    fn run_creates_expected_output_files() {
496        let dir = tempfile::tempdir().unwrap();
497        let out_dir = dir.path().join("test_run_1");
498
499        let engine = test_engine();
500        let ingest = test_ingest();
501
502        let (outputs, _) = engine
503            .run(
504                ingest,
505                "tester",
506                "2025-01-01..2025-02-01",
507                &out_dir,
508                false,
509                &BundleProfile::Internal,
510            )
511            .unwrap();
512
513        assert!(outputs.packet_md.exists(), "packet.md missing");
514        assert!(
515            outputs.ledger_events_jsonl.exists(),
516            "ledger.events.jsonl missing"
517        );
518        assert!(
519            outputs.coverage_manifest_json.exists(),
520            "coverage.manifest.json missing"
521        );
522        assert!(
523            outputs.bundle_manifest_json.exists(),
524            "bundle.manifest.json missing"
525        );
526        assert!(
527            out_dir.join("profiles/manager/packet.md").exists(),
528            "manager profile missing"
529        );
530        assert!(
531            out_dir.join("profiles/public/packet.md").exists(),
532            "public profile missing"
533        );
534    }
535
536    #[test]
537    fn run_with_zip_creates_archive() {
538        let dir = tempfile::tempdir().unwrap();
539        let out_dir = dir.path().join("test_run_zip");
540
541        let engine = test_engine();
542        let ingest = test_ingest();
543
544        let (outputs, _) = engine
545            .run(
546                ingest,
547                "tester",
548                "2025-01-01..2025-02-01",
549                &out_dir,
550                true,
551                &BundleProfile::Internal,
552            )
553            .unwrap();
554
555        assert!(
556            outputs.zip_path.is_some(),
557            "zip_path should be Some when zip=true"
558        );
559        assert!(
560            outputs.zip_path.as_ref().unwrap().exists(),
561            "zip file missing"
562        );
563    }
564
565    #[test]
566    fn zip_path_internal_uses_plain_extension() {
567        let p = zip_path_for_profile(Path::new("/tmp/run_123"), &BundleProfile::Internal);
568        assert_eq!(p, Path::new("/tmp/run_123.zip"));
569    }
570
571    #[test]
572    fn zip_path_manager_includes_profile_name() {
573        let p = zip_path_for_profile(Path::new("/tmp/run_123"), &BundleProfile::Manager);
574        assert_eq!(p, Path::new("/tmp/run_123.manager.zip"));
575    }
576}